Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions pumps/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,29 @@ func (s *StdOutPump) Init(config interface{}) error {
func (s *StdOutPump) WriteData(ctx context.Context, data []interface{}) error {
s.log.Debug("Attempting to write ", len(data), " records...")

//Data is all the analytics being written
for _, v := range data {
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
s.log.Error("Failed to decode analytics record")
continue
}

select {
case <-ctx.Done():
return nil
default:
decoded := v.(analytics.AnalyticsRecord)

if s.conf.Format == "json" {
formatter := &logrus.JSONFormatter{}
entry := log.WithField(s.conf.LogFieldName, decoded)
entry.Level = logrus.InfoLevel
entry.Time = time.Now().UTC()
data, _ := formatter.Format(entry)
fmt.Print(string(data))
} else {
s.log.WithField(s.conf.LogFieldName, decoded).Info()
if s.conf.Format == "json" {
formatter := &logrus.JSONFormatter{}
entry := log.WithField(s.conf.LogFieldName, decoded)
entry.Level = logrus.InfoLevel
entry.Time = time.Now().UTC()
data, err := formatter.Format(entry)
if err != nil {
s.log.Error("Failed to format record: ", err)
continue
}

fmt.Print(string(data))
} else {
s.log.WithField(s.conf.LogFieldName, decoded).Info()
}
}

s.log.Info("Purged ", len(data), " records...")

return nil
Expand Down
68 changes: 68 additions & 0 deletions pumps/stdout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pumps

import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/sirupsen/logrus"
)

// Previous implementation was canceling the writes if the context was cancelled.
// This test ensures that the pump will continue to write data even if the context is cancelled.
func TestStdOutPump_WriteData_ContextCancellation(t *testing.T) {
// Setup pump
pump := &StdOutPump{
conf: &StdOutConf{
LogFieldName: "test-analytics",
Format: "json",
},
}

// Setup logger
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
pump.log = logger.WithField("prefix", "test")

// Create many records to test with
data := make([]interface{}, 100)
for i := range data {
data[i] = analytics.AnalyticsRecord{
Path: fmt.Sprintf("/test/%d", i),
Method: "GET",
}
}

// Create an already cancelled context
// && cancel immediately
ctx, cancel := context.WithCancel(context.Background())
cancel()


// Capture logger output
var buf bytes.Buffer
pump.log.Logger.SetOutput(&buf)
oldOut := pump.log.Logger.Out
defer pump.log.Logger.SetOutput(oldOut) // restore original output when done

err := pump.WriteData(ctx, data)

output := buf.String()

if err != nil {
t.Errorf("Expected no error, wanted the Pump to finish purging despite context cancellation, got %v", err)
}

// Verify the output contains the expected message
attemptMsg := "Attempting to write 100 records"
if !strings.Contains(output, attemptMsg) {
t.Errorf("Expected output does not contain '%s'", attemptMsg)
}
purgeMsg := "Purged 100 records..."
if !strings.Contains(output, purgeMsg) {
t.Errorf("Expected output does not contain '%s'", purgeMsg)
}
}