From 32c5733993fdb876b94ee1fdc70ea519668b045d Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 6 Oct 2025 17:03:27 +0200 Subject: [PATCH 1/4] refact file acquisition tests: prevent data race --- pkg/acquisition/modules/file/file_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index dba96f78ec9..a0a072cc31f 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "runtime" + "sync" "testing" "time" @@ -356,22 +357,21 @@ force_inotify: true`, testPattern), } actualLines := 0 + var wg sync.WaitGroup if tc.expectedLines != 0 { - var stopReading bool - defer func() { stopReading = true }() + wg.Add(1) go func() { + defer wg.Done() for { select { case <-out: actualLines++ - default: - if stopReading { - return - } - // Small sleep to prevent tight loop - time.Sleep(100 * time.Millisecond) + case <-tomb.Dying(): + return + case <-time.After(100 * time.Millisecond): + // avoid tight loop } } }() @@ -417,6 +417,11 @@ force_inotify: true`, testPattern), time.Sleep(2 * time.Second) os.Remove(streamLogFile) + + // stop acquisition and wait for tailer + tomb.Kill(nil) + wg.Wait() + assert.Equal(t, tc.expectedLines, actualLines) } @@ -432,8 +437,6 @@ force_inotify: true`, testPattern), if tc.teardown != nil { tc.teardown() } - - tomb.Kill(nil) }) } } From b40f0b43f326894a55ea354e6daffd9ed0cf0895 Mon Sep 17 00:00:00 2001 From: marco Date: Tue, 7 Oct 2025 12:31:12 +0200 Subject: [PATCH 2/4] replace wait loop with testify.Eventually --- pkg/acquisition/modules/file/file_test.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index a0a072cc31f..a479b925e93 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -388,20 +388,9 @@ force_inotify: true`, testPattern), require.NoError(t, err, "could not create test file") // wait for the file to be tailed - waitingForTail := true - for waitingForTail { - select { - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for file to be tailed") - default: - if !f.IsTailing(streamLogFile) { - time.Sleep(50 * time.Millisecond) - continue - } - - waitingForTail = false - } - } + require.Eventually(t, func() bool { + return f.IsTailing(streamLogFile) + }, 5*time.Second, 50 * time.Millisecond, "Timeout waiting for %q to be tailed", streamLogFile) for i := range 5 { _, err = fmt.Fprintf(fd, "%d\n", i) From c7b81bedb0e91b28d055107da32996d2172d80ab Mon Sep 17 00:00:00 2001 From: marco Date: Tue, 7 Oct 2025 13:19:17 +0200 Subject: [PATCH 3/4] remove sleep --- pkg/acquisition/modules/file/file_test.go | 40 +++++++++++------------ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index a479b925e93..ccaaa274d00 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "runtime" "sync" + "sync/atomic" "testing" "time" @@ -356,7 +357,7 @@ force_inotify: true`, testPattern), tc.afterConfigure() } - actualLines := 0 + var actualLines atomic.Int64 var wg sync.WaitGroup if tc.expectedLines != 0 { @@ -367,11 +368,9 @@ force_inotify: true`, testPattern), for { select { case <-out: - actualLines++ + actualLines.Add(1) case <-tomb.Dying(): return - case <-time.After(100 * time.Millisecond): - // avoid tight loop } } }() @@ -402,16 +401,14 @@ force_inotify: true`, testPattern), fd.Close() - // sleep to ensure the tail events are processed - time.Sleep(2 * time.Second) + require.Eventually(t, func() bool { + return actualLines.Load() == int64(tc.expectedLines) + }, 5*time.Second, 100*time.Millisecond) os.Remove(streamLogFile) - // stop acquisition and wait for tailer tomb.Kill(nil) wg.Wait() - - assert.Equal(t, tc.expectedLines, actualLines) } if tc.expectedOutput != "" { @@ -488,9 +485,12 @@ mode: tail require.NoError(t, err) // Wait for polling to detect the file - time.Sleep(4 * time.Second) - require.True(t, f.IsTailing(testFile), "File should be tailed after polling") + require.Eventually(t, func() bool { + return f.IsTailing(testFile) + }, 5*time.Second, 100*time.Millisecond, "File should be tailed after polling") + + // could be require.Never, but detection has triggered already - no need to slow down the test require.False(t, f.IsTailing(ignoredFile), "File should be ignored after polling") // Cleanup @@ -529,21 +529,19 @@ mode: tail require.NoError(t, err) // Wait for initial tail setup - time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + return f.IsTailing(testFile) + }, 3*time.Second, 100*time.Millisecond, "File should be initially tailed") // Simulate tailer death by removing it from the map f.RemoveTail(testFile) - isTailed := f.IsTailing(testFile) - require.False(t, isTailed, "File should be removed from the map") + require.False(t, f.IsTailing(testFile), "File should be removed from the map") - // Wait for polling to resurrect the file - time.Sleep(2 * time.Second) + // Wait for polling to resurrect the tail + require.Eventually(t, func() bool { + return f.IsTailing(testFile) + }, 5*time.Second, 100*time.Millisecond, "File should be resurrected via polling") - // Verify file is being tailed again - isTailed = f.IsTailing(testFile) - require.True(t, isTailed, "File should be resurrected via polling") - - // Cleanup tomb.Kill(nil) require.NoError(t, tomb.Wait()) } From b87d7aa46d1ee3069a969022866537163e8cd2b4 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 7 Nov 2025 12:56:52 +0100 Subject: [PATCH 4/4] relax timing --- pkg/acquisition/modules/file/file_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index ccaaa274d00..61519dd575c 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -304,7 +304,7 @@ force_inotify: true`, testPattern), require.NoError(t, err) err = f.Close() require.NoError(t, err) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) err = os.Chmod(f.Name(), 0o000) require.NoError(t, err) }, @@ -403,7 +403,7 @@ force_inotify: true`, testPattern), require.Eventually(t, func() bool { return actualLines.Load() == int64(tc.expectedLines) - }, 5*time.Second, 100*time.Millisecond) + }, 10*time.Second, 100*time.Millisecond) os.Remove(streamLogFile)