Skip to content

Commit 582d4e7

Browse files
committed
Make state persistence async to avoid routing stalls
1 parent d9c04d4 commit 582d4e7

File tree

2 files changed

+74
-45
lines changed

2 files changed

+74
-45
lines changed

pkg/state/state.go

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Manager struct {
2929
state *State
3030
mu sync.RWMutex
3131
stateFile string
32+
saveQueue chan State
3233
}
3334

3435
var (
@@ -49,6 +50,7 @@ func NewManager(workspace string) *Manager {
4950
workspace: workspace,
5051
stateFile: stateFile,
5152
state: &State{},
53+
saveQueue: make(chan State, 1),
5254
}
5355

5456
loadedState, loadedFromLegacy, err := loadBootstrapWithTimeout(stateFile, oldStateFile, stateBootstrapTimeout)
@@ -63,6 +65,8 @@ func NewManager(workspace string) *Manager {
6365
}
6466
}
6567

68+
go sm.saveLoop()
69+
6670
return sm
6771
}
6872

@@ -71,34 +75,22 @@ func NewManager(workspace string) *Manager {
7175
// ensuring that the state file is never corrupted even if the process crashes.
7276
func (sm *Manager) SetLastChannel(channel string) error {
7377
sm.mu.Lock()
74-
defer sm.mu.Unlock()
75-
76-
// Update state
7778
sm.state.LastChannel = channel
7879
sm.state.Timestamp = time.Now()
79-
80-
// Atomic save using temp file + rename
81-
if err := sm.saveAtomic(); err != nil {
82-
return fmt.Errorf("failed to save state atomically: %w", err)
83-
}
84-
80+
snapshot := *sm.state
81+
sm.mu.Unlock()
82+
sm.enqueueSave(snapshot)
8583
return nil
8684
}
8785

8886
// SetLastChatID atomically updates the last chat ID and saves the state.
8987
func (sm *Manager) SetLastChatID(chatID string) error {
9088
sm.mu.Lock()
91-
defer sm.mu.Unlock()
92-
93-
// Update state
9489
sm.state.LastChatID = chatID
9590
sm.state.Timestamp = time.Now()
96-
97-
// Atomic save using temp file + rename
98-
if err := sm.saveAtomic(); err != nil {
99-
return fmt.Errorf("failed to save state atomically: %w", err)
100-
}
101-
91+
snapshot := *sm.state
92+
sm.mu.Unlock()
93+
sm.enqueueSave(snapshot)
10294
return nil
10395
}
10496

@@ -123,19 +115,17 @@ func (sm *Manager) GetTimestamp() time.Time {
123115
return sm.state.Timestamp
124116
}
125117

126-
// saveAtomic performs an atomic save using temp file + rename.
118+
// saveAtomicSnapshot performs an atomic save using temp file + rename.
127119
// This ensures that the state file is never corrupted:
128120
// 1. Write to a temp file
129121
// 2. Rename temp file to target (atomic on POSIX systems)
130122
// 3. If rename fails, cleanup the temp file
131-
//
132-
// Must be called with the lock held.
133-
func (sm *Manager) saveAtomic() error {
123+
func (sm *Manager) saveAtomicSnapshot(snapshot State) error {
134124
// Create temp file in the same directory as the target
135125
tempFile := sm.stateFile + ".tmp"
136126

137127
// Marshal state to JSON
138-
data, err := json.MarshalIndent(sm.state, "", " ")
128+
data, err := json.MarshalIndent(snapshot, "", " ")
139129
if err != nil {
140130
return fmt.Errorf("failed to marshal state: %w", err)
141131
}
@@ -155,6 +145,32 @@ func (sm *Manager) saveAtomic() error {
155145
return nil
156146
}
157147

148+
func (sm *Manager) saveLoop() {
149+
for snapshot := range sm.saveQueue {
150+
if err := sm.saveAtomicSnapshot(snapshot); err != nil {
151+
log.Printf("[WARN] state: async save failed for %s: %v", sm.workspace, err)
152+
}
153+
}
154+
}
155+
156+
func (sm *Manager) enqueueSave(snapshot State) {
157+
select {
158+
case sm.saveQueue <- snapshot:
159+
return
160+
default:
161+
}
162+
163+
// Queue already has an older snapshot; drop it and enqueue the latest.
164+
select {
165+
case <-sm.saveQueue:
166+
default:
167+
}
168+
select {
169+
case sm.saveQueue <- snapshot:
170+
default:
171+
}
172+
}
173+
158174
// load loads the state from disk.
159175
func (sm *Manager) load() error {
160176
loaded, err := loadStateFromPath(sm.stateFile)

pkg/state/state_test.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ import (
99
"time"
1010
)
1111

12+
func waitForCondition(t *testing.T, timeout time.Duration, cond func() bool, msg string) {
13+
t.Helper()
14+
deadline := time.Now().Add(timeout)
15+
for time.Now().Before(deadline) {
16+
if cond() {
17+
return
18+
}
19+
time.Sleep(10 * time.Millisecond)
20+
}
21+
t.Fatal(msg)
22+
}
23+
1224
func TestAtomicSave(t *testing.T) {
1325
// Create temp workspace
1426
tmpDir, err := os.MkdirTemp("", "state-test-*")
@@ -38,15 +50,16 @@ func TestAtomicSave(t *testing.T) {
3850

3951
// Verify state file exists
4052
stateFile := filepath.Join(tmpDir, "state", "state.json")
41-
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
42-
t.Error("Expected state file to exist")
43-
}
53+
waitForCondition(t, time.Second, func() bool {
54+
_, err := os.Stat(stateFile)
55+
return err == nil
56+
}, "expected state file to exist")
4457

4558
// Create a new manager to verify persistence
46-
sm2 := NewManager(tmpDir)
47-
if sm2.GetLastChannel() != "test-channel" {
48-
t.Errorf("Expected persistent channel 'test-channel', got '%s'", sm2.GetLastChannel())
49-
}
59+
waitForCondition(t, time.Second, func() bool {
60+
sm2 := NewManager(tmpDir)
61+
return sm2.GetLastChannel() == "test-channel"
62+
}, "expected persistent channel 'test-channel'")
5063
}
5164

5265
func TestSetLastChatID(t *testing.T) {
@@ -76,10 +89,10 @@ func TestSetLastChatID(t *testing.T) {
7689
}
7790

7891
// Create a new manager to verify persistence
79-
sm2 := NewManager(tmpDir)
80-
if sm2.GetLastChatID() != "test-chat-id" {
81-
t.Errorf("Expected persistent chat ID 'test-chat-id', got '%s'", sm2.GetLastChatID())
82-
}
92+
waitForCondition(t, time.Second, func() bool {
93+
sm2 := NewManager(tmpDir)
94+
return sm2.GetLastChatID() == "test-chat-id"
95+
}, "expected persistent chat ID 'test-chat-id'")
8396
}
8497

8598
func TestAtomicity_NoCorruptionOnInterrupt(t *testing.T) {
@@ -157,6 +170,11 @@ func TestConcurrentAccess(t *testing.T) {
157170

158171
// Verify state file is valid JSON
159172
stateFile := filepath.Join(tmpDir, "state", "state.json")
173+
waitForCondition(t, time.Second, func() bool {
174+
_, err := os.Stat(stateFile)
175+
return err == nil
176+
}, "expected state file to exist after concurrent writes")
177+
160178
data, err := os.ReadFile(stateFile)
161179
if err != nil {
162180
t.Fatalf("Failed to read state file: %v", err)
@@ -180,17 +198,12 @@ func TestNewManager_ExistingState(t *testing.T) {
180198
sm1.SetLastChannel("existing-channel")
181199
sm1.SetLastChatID("existing-chat-id")
182200

183-
// Create new manager with same workspace
184-
sm2 := NewManager(tmpDir)
185-
186-
// Verify state was loaded
187-
if sm2.GetLastChannel() != "existing-channel" {
188-
t.Errorf("Expected channel 'existing-channel', got '%s'", sm2.GetLastChannel())
189-
}
190-
191-
if sm2.GetLastChatID() != "existing-chat-id" {
192-
t.Errorf("Expected chat ID 'existing-chat-id', got '%s'", sm2.GetLastChatID())
193-
}
201+
// Create new manager with same workspace once persistence catches up.
202+
waitForCondition(t, time.Second, func() bool {
203+
sm2 := NewManager(tmpDir)
204+
return sm2.GetLastChannel() == "existing-channel" &&
205+
sm2.GetLastChatID() == "existing-chat-id"
206+
}, "expected existing state to be loaded")
194207
}
195208

196209
func TestNewManager_EmptyWorkspace(t *testing.T) {

0 commit comments

Comments
 (0)