Skip to content

Commit 35a4c73

Browse files
committed
Make hook audit writes async and non-blocking
1 parent d27dfd2 commit 35a4c73

File tree

3 files changed

+76
-27
lines changed

3 files changed

+76
-27
lines changed

pkg/agent/loop_test.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -672,16 +672,23 @@ func TestAgentLoop_WritesHookAuditEvents(t *testing.T) {
672672
}
673673

674674
auditPath := filepath.Join(tmpDir, "hooks", "hook-events.jsonl")
675-
data, err := os.ReadFile(auditPath)
676-
if err != nil {
677-
t.Fatalf("read hook audit file: %v", err)
678-
}
679-
body := string(data)
680-
if !strings.Contains(body, "\"event\":\"before_turn\"") {
681-
t.Fatalf("expected before_turn event in audit file")
682-
}
683-
if !strings.Contains(body, "\"event\":\"after_turn\"") {
684-
t.Fatalf("expected after_turn event in audit file")
675+
deadline := time.Now().Add(2 * time.Second)
676+
var body string
677+
for {
678+
data, readErr := os.ReadFile(auditPath)
679+
if readErr == nil {
680+
body = string(data)
681+
if strings.Contains(body, "\"event\":\"before_turn\"") && strings.Contains(body, "\"event\":\"after_turn\"") {
682+
break
683+
}
684+
}
685+
if time.Now().After(deadline) {
686+
if readErr != nil {
687+
t.Fatalf("read hook audit file after wait: %v", readErr)
688+
}
689+
t.Fatalf("expected before_turn and after_turn events in audit file, got: %s", body)
690+
}
691+
time.Sleep(10 * time.Millisecond)
685692
}
686693
}
687694

pkg/hooks/audit_jsonl.go

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8-
"sync"
8+
)
9+
10+
const (
11+
// Buffer audit writes so hook dispatch never blocks on slow filesystems.
12+
auditQueueSize = 256
913
)
1014

1115
// JSONLAuditSink appends hook entries as JSONL.
1216
type JSONLAuditSink struct {
13-
mu sync.Mutex
14-
path string
17+
path string
18+
queue chan []byte
1519
}
1620

1721
func NewJSONLAuditSink(workspace string) (*JSONLAuditSink, error) {
@@ -23,28 +27,57 @@ func NewJSONLAuditSinkAt(path string) (*JSONLAuditSink, error) {
2327
if err := os.MkdirAll(dir, 0755); err != nil {
2428
return nil, fmt.Errorf("create hooks audit dir: %w", err)
2529
}
26-
return &JSONLAuditSink{path: path}, nil
30+
sink := &JSONLAuditSink{
31+
path: path,
32+
queue: make(chan []byte, auditQueueSize),
33+
}
34+
go sink.writeLoop()
35+
return sink, nil
2736
}
2837

2938
func (s *JSONLAuditSink) Path() string {
3039
return s.path
3140
}
3241

3342
func (s *JSONLAuditSink) Write(entry AuditEntry) error {
34-
s.mu.Lock()
35-
defer s.mu.Unlock()
36-
37-
f, err := os.OpenFile(s.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
43+
b, err := json.Marshal(entry)
3844
if err != nil {
3945
return err
4046
}
41-
defer f.Close()
4247

43-
b, err := json.Marshal(entry)
48+
line := append(b, '\n')
49+
select {
50+
case s.queue <- line:
51+
return nil
52+
default:
53+
}
54+
55+
// Queue full: drop oldest pending line so current hook event can proceed.
56+
select {
57+
case <-s.queue:
58+
default:
59+
}
60+
select {
61+
case s.queue <- line:
62+
default:
63+
}
64+
return nil
65+
}
66+
67+
func (s *JSONLAuditSink) writeLoop() {
68+
for line := range s.queue {
69+
_ = s.appendLine(line)
70+
}
71+
}
72+
73+
func (s *JSONLAuditSink) appendLine(line []byte) error {
74+
f, err := os.OpenFile(s.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
4475
if err != nil {
4576
return err
4677
}
47-
if _, err := f.Write(append(b, '\n')); err != nil {
78+
defer f.Close()
79+
80+
if _, err := f.Write(line); err != nil {
4881
return err
4982
}
5083
return nil

pkg/hooks/audit_jsonl_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,20 @@ func TestJSONLAuditSinkWrite(t *testing.T) {
1818
if err := sink.Write(entry); err != nil {
1919
t.Fatalf("Write: %v", err)
2020
}
21-
data, err := os.ReadFile(filepath.Join(ws, "hooks", "hook-events.jsonl"))
22-
if err != nil {
23-
t.Fatalf("read audit file: %v", err)
24-
}
25-
if !strings.Contains(string(data), "\"turn_id\":\"turn-1\"") {
26-
t.Fatalf("audit content missing turn_id: %s", string(data))
21+
22+
auditPath := filepath.Join(ws, "hooks", "hook-events.jsonl")
23+
deadline := time.Now().Add(2 * time.Second)
24+
for {
25+
data, err := os.ReadFile(auditPath)
26+
if err == nil && strings.Contains(string(data), "\"turn_id\":\"turn-1\"") {
27+
return
28+
}
29+
if time.Now().After(deadline) {
30+
if err != nil {
31+
t.Fatalf("read audit file after wait: %v", err)
32+
}
33+
t.Fatalf("audit content missing turn_id after wait: %s", string(data))
34+
}
35+
time.Sleep(10 * time.Millisecond)
2736
}
2837
}

0 commit comments

Comments
 (0)