Skip to content

Commit b3b5084

Browse files
eguguchkinmoflotas
authored andcommitted
Seal factions immediately after reply on start
# Conflicts: # fracmanager/fracmanager.go
1 parent 79e40a6 commit b3b5084

File tree

3 files changed

+56
-41
lines changed

3 files changed

+56
-41
lines changed

fracmanager/fracmanager.go

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ type activeRef struct {
6363
frac *proxyFrac
6464
}
6565

66+
func (fm *FracManager) newActiveRef(active *frac.Active) activeRef {
67+
f := &proxyFrac{active: active, fp: fm.fracProvider}
68+
return activeRef{
69+
frac: f,
70+
ref: &fracRef{instance: f},
71+
}
72+
}
73+
6674
func NewFracManager(cfg *Config) *FracManager {
6775
FillConfigWithDefault(cfg)
6876

@@ -280,11 +288,19 @@ func (fm *FracManager) Start() {
280288

281289
func (fm *FracManager) Load(ctx context.Context) error {
282290
var err error
283-
var notSealed []activeRef
284291

285292
l := NewLoader(fm.config, fm.fracProvider, fm.fracCache)
286293

287-
if fm.fracs, notSealed, err = l.load(ctx); err != nil {
294+
actives, sealed, err := l.load()
295+
if err != nil {
296+
return err
297+
}
298+
299+
for _, s := range sealed {
300+
fm.fracs = append(fm.fracs, &fracRef{instance: s})
301+
}
302+
303+
if err := fm.replayAll(ctx, actives); err != nil {
288304
return err
289305
}
290306

@@ -298,16 +314,39 @@ func (fm *FracManager) Load(ctx context.Context) error {
298314
}
299315
}
300316

301-
if len(notSealed) == 0 {
302-
fm.rotate()
303-
} else {
304-
if len(notSealed) > 1 {
305-
logger.Info("sealing active fractions")
306-
for _, active := range notSealed[:len(notSealed)-1] {
307-
fm.seal(active)
308-
}
317+
if fm.active.ref == nil { // no active
318+
_ = fm.rotate() // make new empty active
319+
}
320+
321+
return nil
322+
}
323+
324+
func (fm *FracManager) replayAll(ctx context.Context, actives []*frac.Active) error {
325+
wg := sync.WaitGroup{}
326+
defer wg.Wait()
327+
328+
for i, a := range actives {
329+
if err := a.Replay(ctx); err != nil {
330+
return err
331+
}
332+
if a.Info().DocsTotal == 0 {
333+
a.Suicide() // remove empty
334+
continue
335+
}
336+
r := fm.newActiveRef(a)
337+
fm.fracs = append(fm.fracs, r.ref)
338+
339+
if i == len(actives)-1 { // last and not empty
340+
fm.active = r
341+
} else {
342+
wg.Wait() // wait previous sealing complete
343+
344+
wg.Add(1)
345+
go func() {
346+
fm.seal(r)
347+
wg.Done()
348+
}()
309349
}
310-
fm.active = notSealed[len(notSealed)-1]
311350
}
312351

313352
return nil
@@ -371,7 +410,7 @@ func (fm *FracManager) rotate() activeRef {
371410
baseFilePath := filepath.Join(fm.config.DataDir, filePath)
372411
logger.Info("creating new fraction", zap.String("filepath", baseFilePath))
373412

374-
next := fm.fracProvider.newActiveRef(fm.fracProvider.NewActive(baseFilePath))
413+
next := fm.newActiveRef(fm.fracProvider.NewActive(baseFilePath))
375414

376415
fm.fracMu.Lock()
377416
prev := fm.active

fracmanager/fraction_provider.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,3 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *frac.
6969
func (fp *fractionProvider) Stop() {
7070
fp.activeIndexer.Stop()
7171
}
72-
73-
func (fp *fractionProvider) newActiveRef(active *frac.Active) activeRef {
74-
f := &proxyFrac{active: active, fp: fp}
75-
return activeRef{
76-
frac: f,
77-
ref: &fracRef{instance: f},
78-
}
79-
}

fracmanager/loader.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package fracmanager
22

33
import (
4-
"context"
54
"fmt"
65
"os"
76
"path/filepath"
@@ -44,7 +43,7 @@ func NewLoader(config *Config, fracProvider *fractionProvider, fracCache *sealed
4443
}
4544
}
4645

47-
func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) {
46+
func (l *loader) load() ([]*frac.Active, []*frac.Sealed, error) {
4847
fracIDs, infos := l.makeInfos(l.getFileList())
4948
sort.Strings(fracIDs)
5049

@@ -58,7 +57,7 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) {
5857
infosList := l.filterInfos(fracIDs, infos)
5958
cnt := len(infosList)
6059

61-
fracs := make([]*fracRef, 0, cnt)
60+
fracs := make([]*frac.Sealed, 0, cnt)
6261
actives := make([]*frac.Active, 0)
6362

6463
diskFracCache := NewFracCacheFromDisk(filepath.Join(l.config.DataDir, consts.FracCacheFileSuffix))
@@ -73,13 +72,13 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) {
7372
removeFile(info.base + consts.DocsFileSuffix)
7473
}
7574
sealed := l.loadSealedFrac(diskFracCache, info)
76-
fracs = append(fracs, &fracRef{instance: sealed})
75+
fracs = append(fracs, sealed)
7776
} else {
7877
if info.hasMeta {
7978
actives = append(actives, l.fracProvider.NewActive(info.base))
8079
} else {
8180
sealed := l.loadSealedFrac(diskFracCache, info)
82-
fracs = append(fracs, &fracRef{instance: sealed})
81+
fracs = append(fracs, sealed)
8382
}
8483
}
8584

@@ -97,22 +96,7 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) {
9796

9897
logger.Info("fractions list created", zap.Int("cached", l.cachedFracs), zap.Int("uncached", l.uncachedFracs))
9998

100-
logger.Info("replaying active fractions", zap.Int("count", len(actives)))
101-
notSealed := make([]activeRef, 0)
102-
for _, a := range actives {
103-
if err := a.Replay(ctx); err != nil {
104-
return nil, nil, fmt.Errorf("while replaying blocks: %w", err)
105-
}
106-
if a.Info().DocsTotal == 0 { // skip empty
107-
removeFractionFiles(a.BaseFileName)
108-
continue
109-
}
110-
activeRef := l.fracProvider.newActiveRef(a)
111-
fracs = append(fracs, activeRef.ref)
112-
notSealed = append(notSealed, activeRef)
113-
}
114-
115-
return fracs, notSealed, nil
99+
return actives, fracs, nil
116100
}
117101

118102
func (l *loader) loadSealedFrac(diskFracCache *sealedFracCache, info *fracInfo) *frac.Sealed {

0 commit comments

Comments
 (0)