Skip to content

Commit 378c06d

Browse files
authored
refactor(fracmanager): using fifo queues of fractions (#268)
1 parent f331e03 commit 378c06d

29 files changed

+1380
-1484
lines changed

cmd/seq-db/seq-db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ func startStore(
259259
MaintenanceDelay: 0,
260260
CacheGCDelay: 0,
261261
CacheCleanupDelay: 0,
262+
MinSealFracSize: uint64(cfg.Storage.FracSize) * consts.DefaultMinSealPercent / 100,
262263
SealParams: common.SealParams{
263264
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
264265
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,

frac/active.go

Lines changed: 22 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ type Active struct {
3535

3636
BaseFileName string
3737

38-
useMu sync.RWMutex
39-
suicided bool
40-
released bool
41-
4238
infoMu sync.RWMutex
4339
info *common.Info
4440

@@ -270,40 +266,26 @@ func (f *Active) String() string {
270266
}
271267

272268
func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
273-
dp, release := f.DataProvider(ctx)
274-
defer release()
275-
if dp == nil {
276-
return EmptyFraction.Fetch(ctx, ids)
269+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
270+
return nil, nil
277271
}
272+
273+
dp := f.createDataProvider(ctx)
274+
defer dp.release()
275+
278276
return dp.Fetch(ids)
279277
}
280278

281279
func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
282-
dp, release := f.DataProvider(ctx)
283-
defer release()
284-
if dp == nil {
285-
return EmptyFraction.Search(ctx, params)
280+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
281+
metric.CountersTotal.WithLabelValues("empty_data_provider").Inc()
282+
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil
286283
}
287-
return dp.Search(params)
288-
}
289-
290-
func (f *Active) DataProvider(ctx context.Context) (*activeDataProvider, func()) {
291-
f.useMu.RLock()
292284

293-
if f.suicided || f.released || f.Info().DocsTotal == 0 { // it is empty active fraction state
294-
if f.suicided {
295-
metric.CountersTotal.WithLabelValues("fraction_suicided").Inc()
296-
}
297-
f.useMu.RUnlock()
298-
return nil, func() {}
299-
}
300-
301-
// it is ordinary active fraction state
302285
dp := f.createDataProvider(ctx)
303-
return dp, func() {
304-
dp.release()
305-
f.useMu.RUnlock()
306-
}
286+
defer dp.release()
287+
288+
return dp.Search(params)
307289
}
308290

309291
func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
@@ -339,49 +321,24 @@ func (f *Active) IsIntersecting(from, to seq.MID) bool {
339321
}
340322

341323
func (f *Active) Release() {
342-
f.useMu.Lock()
343-
f.released = true
344-
f.useMu.Unlock()
345-
346324
f.releaseMem()
347325

348326
if !f.Config.KeepMetaFile {
349-
f.removeMetaFile()
327+
util.RemoveFile(f.metaFile.Name())
350328
}
351329

352330
if !f.Config.SkipSortDocs {
353331
// we use sorted docs in sealed fraction so we can remove original docs of active fraction
354-
f.removeDocsFiles()
332+
util.RemoveFile(f.docsFile.Name())
355333
}
356334
}
357335

358-
// Offload for [Active] fraction is no-op.
359-
//
360-
// Since search within [Active] fraction is too costly (we have to replay the whole index in memory),
361-
// we decided to support offloading only for [Sealed] fractions.
362-
func (f *Active) Offload(context.Context, storage.Uploader) (bool, error) {
363-
return false, nil
364-
}
365-
366336
func (f *Active) Suicide() {
367-
f.useMu.Lock()
368-
released := f.released
369-
f.suicided = true
370-
f.released = true
371-
f.useMu.Unlock()
372-
373-
if released { // fraction can be suicided after release
374-
if f.Config.KeepMetaFile {
375-
f.removeMetaFile() // meta was not removed while release
376-
}
377-
if f.Config.SkipSortDocs {
378-
f.removeDocsFiles() // docs was not removed while release
379-
}
380-
} else { // was not release
381-
f.releaseMem()
382-
f.removeMetaFile()
383-
f.removeDocsFiles()
384-
}
337+
f.releaseMem()
338+
339+
util.RemoveFile(f.metaFile.Name())
340+
util.RemoveFile(f.docsFile.Name())
341+
util.RemoveFile(f.BaseFileName + consts.SdocsFileSuffix)
385342
}
386343

387344
func (f *Active) releaseMem() {
@@ -394,24 +351,12 @@ func (f *Active) releaseMem() {
394351
if err := f.metaFile.Close(); err != nil {
395352
logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err))
396353
}
354+
if err := f.docsFile.Close(); err != nil {
355+
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
356+
}
397357

398358
f.RIDs = nil
399359
f.MIDs = nil
400360
f.TokenList = nil
401361
f.DocsPositions = nil
402362
}
403-
404-
func (f *Active) removeDocsFiles() {
405-
if err := f.docsFile.Close(); err != nil {
406-
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
407-
}
408-
if err := os.Remove(f.docsFile.Name()); err != nil {
409-
logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
410-
}
411-
}
412-
413-
func (f *Active) removeMetaFile() {
414-
if err := os.Remove(f.metaFile.Name()); err != nil {
415-
logger.Error("can't delete metas file", zap.String("frac", f.BaseFileName), zap.Error(err))
416-
}
417-
}

frac/active_indexer.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ type ActiveIndexer struct {
2020
chMerge chan *mergeTask
2121
bulkStats *BulkStatsCollector
2222
workerCount int
23-
24-
stopFn func()
2523
}
2624

2725
type indexTask struct {
@@ -36,13 +34,15 @@ type mergeTask struct {
3634
tokenLIDs *TokenLIDs
3735
}
3836

39-
func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer {
40-
return &ActiveIndexer{
37+
func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
38+
idx := ActiveIndexer{
4139
ch: make(chan *indexTask, chLen),
4240
chMerge: make(chan *mergeTask, chLen),
4341
workerCount: workerCount,
4442
bulkStats: NewBulkStatsCollector(5*time.Second, chLen),
4543
}
44+
stopIdx := idx.start()
45+
return &idx, stopIdx
4646
}
4747

4848
func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
@@ -56,7 +56,7 @@ func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, s
5656
m.Stop()
5757
}
5858

59-
func (ai *ActiveIndexer) Start() {
59+
func (ai *ActiveIndexer) start() func() {
6060
wg := sync.WaitGroup{}
6161
wg.Add(ai.workerCount)
6262

@@ -75,15 +75,11 @@ func (ai *ActiveIndexer) Start() {
7575
}()
7676
}
7777

78-
ai.stopFn = func() {
78+
return func() {
7979
close(ai.ch)
8080
close(ai.chMerge)
81-
8281
wg.Wait()
83-
8482
ai.bulkStats.Stop()
85-
86-
ai.stopFn = nil
8783
}
8884
}
8985

@@ -93,12 +89,6 @@ func (ai *ActiveIndexer) mergeWorker() {
9389
}
9490
}
9591

96-
func (ai *ActiveIndexer) Stop() {
97-
if ai.stopFn != nil {
98-
ai.stopFn()
99-
}
100-
}
101-
10292
var metaDataPool = sync.Pool{
10393
New: func() any {
10494
return new(indexer.MetaData)

frac/active_indexer_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,8 @@ func getTestProcessor() *indexer.Processor {
7676

7777
func BenchmarkIndexer(b *testing.B) {
7878
logger.SetLevel(zapcore.FatalLevel)
79-
idx := NewActiveIndexer(8, 8)
80-
idx.Start()
81-
defer idx.Stop()
79+
idx, stop := NewActiveIndexer(8, 8)
80+
defer stop()
8281

8382
allLogs, err := readFileAllAtOnce(filepath.Join(common.TestDataDir, "k8s.logs"))
8483
readers := splitLogsToBulks(allLogs, 1000)

frac/empty.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

frac/fraction.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/ozontech/seq-db/frac/processor"
1414
"github.com/ozontech/seq-db/metric"
1515
"github.com/ozontech/seq-db/seq"
16-
"github.com/ozontech/seq-db/storage"
1716
)
1817

1918
type Fraction interface {
@@ -22,8 +21,6 @@ type Fraction interface {
2221
Contains(mid seq.MID) bool
2322
Fetch(context.Context, []seq.ID) ([][]byte, error)
2423
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
25-
Offload(ctx context.Context, u storage.Uploader) (bool, error)
26-
Suicide()
2724
}
2825

2926
var (

frac/fraction_concurrency_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
4242
fracPath := filepath.Join(tmpDir, "test_fraction")
4343
defer test_common.RemoveDir(fracPath)
4444

45-
activeIndexer := NewActiveIndexer(numIndexWorkers, 1000)
46-
activeIndexer.Start()
47-
defer activeIndexer.Stop()
45+
activeIndexer, stop := NewActiveIndexer(numIndexWorkers, 1000)
46+
defer stop()
4847

4948
active := NewActive(
5049
fracPath,

0 commit comments

Comments
 (0)