Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (

// known extensions
MetaFileSuffix = ".meta"
WalFileSuffix = ".wal"

DocsFileSuffix = ".docs"
DocsDelFileSuffix = ".docs.del"
Expand Down
119 changes: 113 additions & 6 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frac

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Active struct {

metaFile *os.File
metaReader storage.DocBlocksReader
walReader *storage.WalReader

writer *ActiveWriter
indexer *ActiveIndexer
Expand All @@ -79,7 +81,34 @@ func NewActive(
cfg *Config,
) *Active {
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync)

var metaFile *os.File
var metaStats os.FileInfo
var writer *ActiveWriter
var metaReader storage.DocBlocksReader
var walReader *storage.WalReader
var metaSize uint64

legacyMetaFileName := baseFileName + consts.MetaFileSuffix
if _, err := os.Stat(legacyMetaFileName); err == nil {
// .meta file exists
metaFile, metaStats = mustOpenFile(legacyMetaFileName, config.SkipFsync)
metaSize = uint64(metaStats.Size())
metaReader = storage.NewDocBlocksReader(readLimiter, metaFile)
writer = NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
logger.Info("using legacy meta file format", zap.String("fraction", baseFileName))
} else {
logger.Info("using new WAL format", zap.String("fraction", baseFileName))
walFileName := baseFileName + consts.WalFileSuffix
metaFile, metaStats = mustOpenFile(walFileName, config.SkipFsync)
metaSize = uint64(metaStats.Size())
writer = NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
var err error
walReader, err = storage.NewWalReader(readLimiter, metaFile, baseFileName)
if err != nil {
logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err))
}
}

f := &Active{
TokenList: NewActiveTokenList(config.IndexWorkers),
Expand All @@ -95,13 +124,14 @@ func NewActive(
sortReader: storage.NewDocsReader(readLimiter, docsFile, sortCache),

metaFile: metaFile,
metaReader: storage.NewDocBlocksReader(readLimiter, metaFile),
metaReader: metaReader,
walReader: walReader,

indexer: activeIndexer,
writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),
writer: writer,

BaseFileName: baseFileName,
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), metaSize),
Config: cfg,
}

Expand Down Expand Up @@ -133,6 +163,81 @@ func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) {
}

func (f *Active) Replay(ctx context.Context) error {
walFileName := f.BaseFileName + consts.WalFileSuffix
if _, err := os.Stat(walFileName); err == nil {
return f.replayWalFile(ctx)
}

metaFileName := f.BaseFileName + consts.MetaFileSuffix
if _, err := os.Stat(metaFileName); err == nil {
return f.replayMetaFileLegacy(ctx)
}

logger.Info("neither wal nor legacy meta file was found, skipping replay", zap.String("fraction", f.BaseFileName))
return nil
}

func (f *Active) replayWalFile(ctx context.Context) error {
if f.walReader == nil {
return fmt.Errorf("WAL reader not initialized")
}

logger.Info("start replaying WAL file...", zap.String("name", f.info.Name()))

t := time.Now()

step := f.info.MetaOnDisk / 10
next := step

sw := stopwatch.New()
wg := sync.WaitGroup{}

for entry := range f.walReader.Iter() {
// Check for context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if entry.Err != nil {
return entry.Err
}

if uint64(entry.Offset) > next {
next += step
progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", f.info.Name()),
zap.Int64("from", entry.Offset),
zap.Int64("to", entry.Offset+entry.Size),
zap.Uint64("target", f.info.MetaOnDisk),
util.ZapFloat64WithPrec("progress_percentage", progress, 2),
)
}

wg.Add(1)
f.indexer.Index(f, entry.Data, &wg, sw)
}

wg.Wait()

tookSeconds := util.DurationToUnit(time.Since(t), "s")
throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds
throughputMeta := util.SizeToUnit(f.info.MetaOnDisk, "mb") / tookSeconds
logger.Info("active fraction replayed",
zap.String("name", f.info.Name()),
zap.Uint32("docs_total", f.info.DocsTotal),
util.ZapUint64AsSizeStr("docs_size", f.info.DocsOnDisk),
util.ZapFloat64WithPrec("took_s", tookSeconds, 1),
util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1),
util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1),
)
return nil
}

// replayMetaFileLegacy replays legacy *.meta files. Only basic corruption detection support is implemented
func (f *Active) replayMetaFileLegacy(ctx context.Context) error {
logger.Info("start replaying...", zap.String("name", f.info.Name()))

t := time.Now()
Expand Down Expand Up @@ -175,7 +280,9 @@ out:
offset += metaSize

wg.Add(1)
f.indexer.Index(f, meta, &wg, sw)

metaBlock := storage.PackDocBlockToMetaBlock(meta)
f.indexer.Index(f, metaBlock, &wg, sw)
}
}

Expand Down Expand Up @@ -204,7 +311,7 @@ var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
}, []string{"stage"})

// Append causes data to be written on disk and sends metas to index workers
func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error) {
func (f *Active) Append(docs storage.DocBlock, metas storage.MetaBlock, wg *sync.WaitGroup) (err error) {
sw := stopwatch.New()
m := sw.Start("append")
if err = f.writer.Write(docs, metas, sw); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions frac/active_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ActiveIndexer struct {

type indexTask struct {
Frac *Active
Metas storage.DocBlock
Metas storage.MetaBlock
Pos uint64
Wg *sync.WaitGroup
}
Expand All @@ -45,10 +45,10 @@ func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
return &idx, stopIdx
}

func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
func (ai *ActiveIndexer) Index(frac *Active, metas storage.MetaBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
m := sw.Start("send_index_chan")
ai.ch <- &indexTask{
Pos: storage.DocBlock(metas).GetExt2(),
Pos: metas.DocsOffset(),
Metas: metas,
Frac: frac,
Wg: wg,
Expand Down
2 changes: 1 addition & 1 deletion frac/active_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkIndexer(b *testing.B) {
bulks := make([][]byte, 0, len(readers))
for _, readNext := range readers {
_, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext)
bulks = append(bulks, storage.CompressDocBlock(meta, nil, 3))
bulks = append(bulks, storage.CompressMetaBlock(meta, nil, 3))
}
b.StartTimer()

Expand Down
21 changes: 16 additions & 5 deletions frac/active_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,29 @@ import (

type ActiveWriter struct {
docs *FileWriter
meta *FileWriter
meta MetaWriter
}

func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter {
type MetaWriter interface {
Write(data []byte, sw *stopwatch.Stopwatch) (int64, error)
Stop()
}

func NewActiveWriter(docsFile, walFile *os.File, docsOffset, walOffset int64, skipFsync bool) *ActiveWriter {
return &ActiveWriter{
docs: NewFileWriter(docsFile, docsOffset, skipFsync),
meta: storage.NewWalWriter(walFile, walOffset, skipFsync),
}
}

func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter {
return &ActiveWriter{
docs: NewFileWriter(docsFile, docsOffset, skipFsync),
meta: NewFileWriter(metaFile, metaOffset, skipFsync),
}
}

func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error {
func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.MetaBlock, sw *stopwatch.Stopwatch) error {
m := sw.Start("write_docs")
offset, err := a.docs.Write(docs, sw)
m.Stop()
Expand All @@ -28,8 +40,7 @@ func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error {
return err
}

storage.DocBlock(meta).SetExt1(uint64(len(docs)))
storage.DocBlock(meta).SetExt2(uint64(offset))
meta.SetDocsOffset(uint64(offset))

m = sw.Start("write_meta")
_, err = a.meta.Write(meta, sw)
Expand Down
10 changes: 10 additions & 0 deletions frac/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

"github.com/ozontech/seq-db/metric/stopwatch"
"github.com/ozontech/seq-db/storage"
)

type writeSyncer interface {
Expand All @@ -21,6 +22,9 @@ type writeSyncer interface {
// is performed, after which all requests receive a response about the successful (or unsuccessful) fsync.
//
// This results in one fsync system call for several writers performing a write at approximately the same time.
//
// FileWriter always stores data in DocBlock format. If MetaBlock is passed to Write, then it's converted to
// DocBlock.
type FileWriter struct {
ws writeSyncer
offset atomic.Int64
Expand Down Expand Up @@ -69,6 +73,12 @@ func (fs *FileWriter) syncLoop() {
func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) {
m := sw.Start("write_duration")

if storage.IsMetaBlock(data) {
// MetaBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix)
// This may happen if a new version of store has been deployed while a legacy active fraction with *.meta file exists.
data = storage.PackMetaBlockToDocBlock(data, nil)
}

dataLen := int64(len(data))
offset := fs.offset.Add(dataLen) - dataLen
_, err := fs.ws.WriteAt(data, offset)
Expand Down
39 changes: 39 additions & 0 deletions frac/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/metric/stopwatch"
"github.com/ozontech/seq-db/storage"
)

type testWriterSyncer struct {
Expand Down Expand Up @@ -262,3 +264,40 @@ func TestSparseWrite(t *testing.T) {
e = os.Remove(rf.Name())
assert.NoError(t, e)
}

func TestFileWriterConvertsMetaBlockToDocBlock(t *testing.T) {
f, err := os.Create(t.TempDir() + "/test_metablock.txt")
require.NoError(t, err)
defer f.Close()

fw := NewFileWriter(f, 0, false)

originalPayload := []byte("test payload for MetaBlock to DocBlock conversion")
metaBlock := storage.CompressMetaBlock(originalPayload, nil, 3)
metaBlock.SetDocsOffset(12345)
metaBlock.SetVersion(1)

sw := stopwatch.New()
offset, err := fw.Write(metaBlock, sw)
require.NoError(t, err)

fw.Stop()

docBlockSize := storage.DocBlockHeaderLen + metaBlock.Len()

readBuf := make([]byte, docBlockSize)
bytesRead, err := f.ReadAt(readBuf, offset)
require.NoError(t, err)
require.Equal(t, int(docBlockSize), bytesRead)
readBuf = readBuf[:bytesRead]

docBlock := storage.DocBlock(readBuf)

assert.Equal(t, storage.CodecZSTD, docBlock.Codec())
assert.Equal(t, uint64(len(originalPayload)), docBlock.RawLen())
assert.Equal(t, uint64(12345), docBlock.GetExt2())

decompressed, err := docBlock.DecompressTo(nil)
require.NoError(t, err)
assert.Equal(t, originalPayload, decompressed)
}
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,15 +1347,15 @@ func (s *FractionTestSuite) TestFractionInfo() {
// but if compression/marshalling has changed, expected values can be updated accordingly
s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match")
// it varies depending on params and docs shuffled
s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(300),
s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(350),
"doc on disk doesn't match. actual value: %d", info.DocsOnDisk)
s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match")
s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match")
s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match")

switch s.fraction.(type) {
case *Active:
s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(350),
s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400),
"meta on disk doesn't match. actual value: %d", info.MetaOnDisk)
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
Expand Down
11 changes: 9 additions & 2 deletions fracmanager/frac_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type fracManifest struct {
basePath string // base path to fraction files (without extension)
hasDocs bool // presence of main documents file
hasIndex bool // presence of index file
hasMeta bool // presence of meta-information
hasMeta bool // presence of meta-information (legacy WAL format)
hasWal bool // presence of WAL with meta (new WAL format)
hasSdocs bool // presence of sorted documents
hasRemote bool // presence of remote fraction

Expand All @@ -42,6 +43,8 @@ func (m *fracManifest) AddExtension(ext string) error {
m.hasDocs = true
case consts.MetaFileSuffix:
m.hasMeta = true
case consts.WalFileSuffix:
m.hasWal = true
case consts.SdocsFileSuffix:
m.hasSdocs = true
case consts.IndexFileSuffix:
Expand Down Expand Up @@ -88,7 +91,7 @@ func (m *fracManifest) Stage() fracStage {
if m.hasIndex && (m.hasSdocs || m.hasDocs) {
return fracStageSealed
}
if m.hasMeta && m.hasDocs {
if (m.hasMeta || m.hasWal) && m.hasDocs {
return fracStageActive
}
if m.hasDocsDel || m.hasIndexDel || m.hasSdocsDel {
Expand Down Expand Up @@ -116,6 +119,10 @@ func removeMeta(m *fracManifest) {
util.RemoveFile(m.basePath + consts.MetaFileSuffix)
m.hasMeta = false
}
if m.hasWal {
util.RemoveFile(m.basePath + consts.WalFileSuffix)
m.hasWal = false
}
}

func removeIndex(m *fracManifest) {
Expand Down
Loading
Loading