Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,28 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func
cancel()
wg.Wait()

// freeze active fraction to prevent new writes
active := lc.registry.Active()
if err := active.Finalize(); err != nil {
// finalize appender fraction to prevent new writes
appender := lc.registry.Appender()
if err := appender.Finalize(); err != nil {
logger.Fatal("shutdown fraction freezing error", zap.Error(err))
}
active.WaitWriteIdle()
appender.WaitWriteIdle()

stopIdx()

lc.SyncInfoCache()

sealOnShutdown(active.instance, provider, cfg.MinSealFracSize)
// Seal active fraction
sealOnShutdown(appender.frac, provider, cfg.MinSealFracSize)

logger.Info("fracmanager's workers are stopped", zap.Int64("took_ms", time.Since(n).Milliseconds()))
}

return &fm, stop, nil
}

func (fm *FracManager) Fractions() List {
return fm.lc.registry.AllFractions()
func (fm *FracManager) FractionsSnapshot() (List, ReleaseSnapshot) {
return fm.lc.registry.FractionsSnapshot()
}

func (fm *FracManager) Oldest() uint64 {
Expand All @@ -116,7 +117,7 @@ func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock)
return ctx.Err()
default:
// Try to append data to the currently active fraction
err := fm.lc.registry.Active().Append(docs, metas)
err := fm.lc.registry.Appender().Append(docs, metas)
if err != nil {
logger.Info("append fail", zap.Error(err))
if err == ErrFractionNotWritable {
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fracmanager_for_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fracmanager
import "sync"

func (fm *FracManager) WaitIdleForTests() {
fm.lc.registry.Active().WaitWriteIdle()
fm.lc.registry.Appender().WaitWriteIdle()
}

func (fm *FracManager) SealForcedForTests() {
Expand Down
25 changes: 14 additions & 11 deletions fracmanager/fracmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,33 @@ func TestSealingOnShutdown(t *testing.T) {
cfg.MinSealFracSize = 0 // to ensure that the frac will not be sealed on shutdown
cfg, fm, stop := setupFracManager(t, cfg)
appendDocsToFracManager(t, fm, 10)
activeName := fm.Fractions()[0].Info().Name()

fractions := fm.lc.registry.all.fractions
activeName := fractions[0].Info().Name()

stop()

// second start
cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown
cfg, fm, stop = setupFracManager(t, cfg)

assert.Equal(t, 1, len(fm.Fractions()), "should have one fraction")
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "fraction should have the same name")
_, ok := fm.Fractions()[0].(*fractionProxy).impl.(*frac.Active)
fractions = fm.lc.registry.all.fractions
assert.Equal(t, 1, len(fractions), "should have one fraction")
assert.Equal(t, activeName, fractions[0].Info().Name(), "fraction should have the same name")
_, ok := fractions[0].(*frac.Active)
assert.True(t, ok, "fraction should be active")

stop()

// third start
_, fm, stop = setupFracManager(t, cfg)

assert.Equal(t, 2, len(fm.Fractions()), "should have 2 fraction: new active and old sealed")
_, ok = fm.Fractions()[0].(*fractionProxy).impl.(*frac.Sealed)
fractions = fm.lc.registry.all.fractions
assert.Equal(t, 2, len(fractions), "should have 2 fraction: new active and old sealed")
_, ok = fractions[0].(*frac.Sealed)
assert.True(t, ok, "first fraction should be sealed")
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "sealed fraction should have the same name")
assert.Equal(t, uint32(0), fm.Fractions()[1].Info().DocsTotal, "active fraction should be empty")
_, ok = fm.Fractions()[1].(*fractionProxy).impl.(*frac.Active)
assert.Equal(t, activeName, fractions[0].Info().Name(), "sealed fraction should have the same name")
assert.Equal(t, uint32(0), fractions[1].Info().DocsTotal, "active fraction should be empty")
_, ok = fractions[1].(*frac.Active)
assert.True(t, ok, "new fraction should be active")

stop()
}
4 changes: 4 additions & 0 deletions fracmanager/fracs_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ func (s *registryStats) SetMetrics() {
s.offloading.SetMetrics(dataSizeTotal, "offloading")
s.remotes.SetMetrics(dataSizeTotal, "remotes")
}

func (s registryStats) TotalSizeOnDiskLocal() uint64 {
return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk
}
23 changes: 20 additions & 3 deletions fracmanager/fraction_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"time"

"github.com/oklog/ulid/v2"
"go.uber.org/zap"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
"github.com/ozontech/seq-db/util"
)

const fileBasePattern = "seq-db-"
Expand Down Expand Up @@ -107,8 +110,11 @@ func (fp *fractionProvider) CreateActive() *frac.Active {

// Seal converts an active fraction to a sealed one
// Process includes sorting, indexing, and data optimization for reading
func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
src, err := frac.NewActiveSealingSource(active, fp.config.SealParams)
func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) {
sealsTotal.Inc()
now := time.Now()

src, err := frac.NewActiveSealingSource(a, fp.config.SealParams)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +123,18 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
return nil, err
}

return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil
s := fp.NewSealedPreloaded(a.BaseFileName, preloaded)

sealingTime := time.Since(now)
sealsDoneSeconds.Observe(sealingTime.Seconds())

logger.Info(
"fraction sealed",
zap.String("fraction", filepath.Base(s.BaseFileName)),
zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")),
)

return s, nil
}

// Offload uploads fraction to S3 storage and returns a remote fraction
Expand Down
Loading
Loading