Skip to content

Commit 7613811

Browse files
author
William Petit
committed
fix: prevent deadlock on index/search access in sqlitevec indexer
1 parent cf8e486 commit 7613811

File tree

3 files changed

+21
-24
lines changed

3 files changed

+21
-24
lines changed

internal/adapter/memory/task_runner.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (r *TaskRunner) GetTask(ctx context.Context, id model.TaskID) (model.Task,
4747
func (r *TaskRunner) Run(ctx context.Context) error {
4848
r.runningMutex.Lock()
4949
r.running = true
50+
slog.Debug("TaskRunner: setting running=true and broadcasting")
5051
r.runningCond.Broadcast()
5152
r.runningMutex.Unlock()
5253

@@ -132,9 +133,12 @@ func (r *TaskRunner) ScheduleTask(ctx context.Context, task model.Task) error {
132133
}()
133134

134135
r.runningMutex.Lock()
136+
slog.Debug("TaskRunner: waiting for running flag", slog.Bool("running", r.running))
135137
for !r.running {
138+
slog.Debug("TaskRunner: waiting on condition variable")
136139
r.runningCond.Wait()
137140
}
141+
slog.Debug("TaskRunner: running flag is true, proceeding")
138142
r.runningMutex.Unlock()
139143

140144
r.semaphore <- struct{}{}
@@ -210,6 +214,7 @@ func (r *TaskRunner) ScheduleTask(ctx context.Context, task model.Task) error {
210214
}
211215

212216
func (r *TaskRunner) updateState(task model.Task, fn func(s *port.TaskState)) {
217+
slog.Debug("TaskRunner: acquiring state mutex for task", slog.String("taskID", string(task.ID())))
213218
r.stateMutex.Lock()
214219
defer r.stateMutex.Unlock()
215220

internal/adapter/pipeline/index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (i *Index) Index(ctx context.Context, document model.Document, funcs ...por
168168

169169
ctx = slogx.WithAttrs(ctx, slog.String("documentID", string(document.ID())))
170170

171-
slog.DebugContext(ctx, "indexing document")
171+
slog.DebugContext(ctx, "pipeline: indexing document", slog.Int("indexCount", count))
172172

173173
for index := range i.indexes {
174174
go func(index *IdentifiedIndex) {
@@ -202,6 +202,7 @@ func (i *Index) Index(ctx context.Context, document model.Document, funcs ...por
202202
defer opts.OnProgress(1)
203203
}
204204

205+
slog.DebugContext(indexCtx, "pipeline: calling Index() on underlying index")
205206
if err := index.Index().Index(indexCtx, document, indexOptions...); err != nil {
206207
err = errors.WithStack(err)
207208
slog.ErrorContext(indexCtx, "could not index document", slog.Any("error", err))

internal/adapter/sqlitevec/index.go

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@ type Index struct {
2424
getConn func(ctx context.Context) (*sqlite3.Conn, error)
2525
llm llm.Client
2626
model string
27-
lock sync.Mutex
27+
// rwLock allows concurrent Search operations while serializing Index/Delete
28+
rwLock sync.RWMutex
2829
}
2930

3031
// DeleteByID implements port.Index.
3132
func (i *Index) DeleteByID(ctx context.Context, ids ...model.SectionID) error {
33+
i.rwLock.Lock()
34+
defer i.rwLock.Unlock()
35+
3236
err := i.withRetry(ctx, func(ctx context.Context, conn *sqlite3.Conn) error {
3337
stmt, _, err := conn.Prepare("DELETE FROM embeddings WHERE section_id IN ( SELECT value FROM json_each(?) );")
3438
if err != nil {
@@ -61,6 +65,9 @@ func (i *Index) DeleteByID(ctx context.Context, ids ...model.SectionID) error {
6165

6266
// DeleteBySource implements port.Index.
6367
func (i *Index) DeleteBySource(ctx context.Context, source *url.URL) error {
68+
i.rwLock.Lock()
69+
defer i.rwLock.Unlock()
70+
6471
err := i.withRetry(ctx, func(ctx context.Context, conn *sqlite3.Conn) error {
6572
stmt, _, err := conn.Prepare("DELETE FROM embeddings WHERE source = ?;")
6673
if err != nil {
@@ -86,25 +93,6 @@ func (i *Index) DeleteBySource(ctx context.Context, source *url.URL) error {
8693
return nil
8794
}
8895

89-
func (i *Index) deleteBySource(ctx context.Context, conn *sqlite3.Conn, source *url.URL) error {
90-
stmt, _, err := conn.Prepare("DELETE FROM embeddings WHERE source = ?;")
91-
if err != nil {
92-
return errors.WithStack(err)
93-
}
94-
95-
defer stmt.Close()
96-
97-
if err := stmt.BindText(1, source.String()); err != nil {
98-
return errors.WithStack(err)
99-
}
100-
101-
if err := stmt.Exec(); err != nil {
102-
return errors.WithStack(err)
103-
}
104-
105-
return nil
106-
}
107-
10896
type indexableChunk struct {
10997
Section model.Section
11098
Text string
@@ -127,6 +115,9 @@ const (
127115

128116
// Index implements port.Index.
129117
func (i *Index) Index(ctx context.Context, document model.Document, funcs ...port.IndexOptionFunc) error {
118+
i.rwLock.Lock()
119+
defer i.rwLock.Unlock()
120+
130121
opts := port.NewIndexOptions(funcs...)
131122

132123
var chunksToProcess []*indexableChunk
@@ -348,6 +339,9 @@ func (i *Index) insertCollection(ctx context.Context, conn *sqlite3.Conn, embedd
348339

349340
// Search implements port.Index.
350341
func (i *Index) Search(ctx context.Context, query string, opts port.IndexSearchOptions) ([]*port.IndexSearchResult, error) {
342+
i.rwLock.RLock()
343+
defer i.rwLock.RUnlock()
344+
351345
var searchResults []*port.IndexSearchResult
352346
err := i.withRetry(ctx, func(ctx context.Context, conn *sqlite3.Conn) error {
353347
res, err := i.llm.Embeddings(ctx, []string{query})
@@ -478,9 +472,6 @@ func (i *Index) Search(ctx context.Context, query string, opts port.IndexSearchO
478472
}
479473

480474
func (i *Index) withRetry(ctx context.Context, fn func(ctx context.Context, conn *sqlite3.Conn) error, codes ...sqlite3.ErrorCode) error {
481-
i.lock.Lock()
482-
defer i.lock.Unlock()
483-
484475
conn, err := i.getConn(ctx)
485476
if err != nil {
486477
return errors.WithStack(err)

0 commit comments

Comments
 (0)