Skip to content

Commit 266d02d

Browse files
committed
feat(fracmanager) add handling for upload queue overflow and disk space exhaustion
1 parent 378c06d commit 266d02d

File tree

8 files changed

+251
-19
lines changed

8 files changed

+251
-19
lines changed

cmd/seq-db/seq-db.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ func startStore(
253253
DataDir: cfg.Storage.DataDir,
254254
FracSize: uint64(cfg.Storage.FracSize),
255255
TotalSize: uint64(cfg.Storage.TotalSize),
256+
SealingQueueLen: uint64(cfg.Storage.SealingQueueLen),
256257
CacheSize: uint64(cfg.Resources.CacheSize),
257258
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
258259
ReplayWorkers: cfg.Resources.ReplayWorkers,
@@ -280,8 +281,10 @@ func startStore(
280281
SkipSortDocs: !cfg.DocsSorting.Enabled,
281282
KeepMetaFile: false,
282283
},
283-
OffloadingEnabled: cfg.Offloading.Enabled,
284-
OffloadingRetention: cfg.Offloading.Retention,
284+
OffloadingEnabled: cfg.Offloading.Enabled,
285+
OffloadingRetention: cfg.Offloading.Retention,
286+
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
287+
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
285288
},
286289
API: storeapi.APIConfig{
287290
StoreMode: configMode,

config/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type Config struct {
6363
// TotalSize specifies upper bound of how much disk space can be occupied
6464
// by sealed fractions before they get deleted (or offloaded).
6565
TotalSize Bytes `config:"total_size" default:"1GiB"`
66+
// SealingQueueLen defines the maximum length of the sealing queue.
67+
// If the queue size exceeds this limit, writing to the store will be paused,
68+
// and bulk requests will start returning errors.
69+
// A value of zero disables this limit, allowing writes to proceed unconditionally.
70+
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
6671
} `config:"storage"`
6772

6873
Cluster struct {
@@ -234,8 +239,13 @@ type Config struct {
234239
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
235240
SecretKey string `config:"secret_key"`
236241
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
237-
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
238242
RetryCount int `config:"retry_count" default:"5"`
243+
// Specifies the percentage of total local dataset size allocated to the offloading queue.
244+
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
245+
// This automatic removal is disabled when set to zero.
246+
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
247+
// Delay duration between consecutive offloading retries
248+
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
239249
} `config:"offloading"`
240250

241251
AsyncSearch struct {

fracmanager/config.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Config struct {
1919
TotalSize uint64
2020
CacheSize uint64
2121

22+
SuspendThreshold uint64
23+
SealingQueueLen uint64
24+
2225
ReplayWorkers int
2326
MaintenanceDelay time.Duration
2427
CacheCleanupDelay time.Duration
@@ -28,8 +31,10 @@ type Config struct {
2831
Fraction frac.Config
2932
MinSealFracSize uint64
3033

31-
OffloadingEnabled bool
32-
OffloadingRetention time.Duration
34+
OffloadingEnabled bool
35+
OffloadingQueueSize uint64
36+
OffloadingRetention time.Duration
37+
OffloadingRetryDelay time.Duration
3338
}
3439

3540
func FillConfigWithDefault(config *Config) *Config {

fracmanager/fraction_registry.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
129129
curInfo := old.instance.Info()
130130
r.stats.sealing.Add(curInfo)
131131

132+
r.active.Suspend(old.Suspended())
133+
132134
wg := sync.WaitGroup{}
133135
wg.Add(1)
134136
// since old.WaitWriteIdle() can take some time, we don't want to do it under the lock
@@ -151,6 +153,31 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
151153
return old, wg.Wait, nil
152154
}
153155

156+
func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) bool {
157+
r.mu.Lock()
158+
defer r.mu.Unlock()
159+
160+
if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) {
161+
r.active.Suspend(true)
162+
return true
163+
}
164+
165+
if maxSize > 0 && r.diskUsage() > maxSize {
166+
r.active.Suspend(true)
167+
return true
168+
}
169+
170+
r.active.Suspend(false)
171+
return false
172+
}
173+
174+
func (r *fractionRegistry) diskUsage() uint64 {
175+
return r.active.instance.Info().FullSize() +
176+
r.stats.sealed.totalSizeOnDisk +
177+
r.stats.sealing.totalSizeOnDisk +
178+
r.stats.offloading.totalSizeOnDisk
179+
}
180+
154181
// addActive sets a new active fraction and updates the complete fractions list.
155182
func (r *fractionRegistry) addActive(a *activeProxy) {
156183
r.muAll.Lock()
@@ -227,6 +254,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*
227254
// Fractions older than retention period are permanently deleted.
228255
// Returns removed fractions or empty slice if nothing to remove.
229256
func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
257+
if retention == 0 {
258+
return nil
259+
}
260+
230261
r.mu.Lock()
231262
defer r.mu.Unlock()
232263

@@ -248,6 +279,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
248279
return evicted
249280
}
250281

282+
// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit.
283+
// Selects fractions that haven't finished offloading yet to minimize data loss.
284+
// Used when offloading queue grows too large due to slow remote storage performance.
285+
func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy {
286+
if sizeLimit == 0 {
287+
return nil
288+
}
289+
290+
r.mu.Lock()
291+
defer r.mu.Unlock()
292+
293+
// Fast path: skip processing if within size limits
294+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit {
295+
return nil
296+
}
297+
298+
count := 0
299+
evicted := []*sealedProxy{}
300+
// filter fractions
301+
for _, item := range r.offloading {
302+
// keep items that are within limits or already offloaded
303+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil {
304+
r.offloading[count] = item
305+
count++
306+
continue
307+
}
308+
evicted = append(evicted, item)
309+
r.stats.offloading.Sub(item.instance.Info())
310+
}
311+
312+
r.offloading = r.offloading[:count]
313+
r.rebuildAllFractions()
314+
315+
return evicted
316+
}
317+
251318
// PromoteToSealed moves fractions from sealing to local queue when sealing completes.
252319
// Maintains strict ordering - younger fractions wait for older ones to seal first.
253320
func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) {
@@ -322,6 +389,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
322389
count++
323390
}
324391
}
392+
393+
if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed)
394+
return
395+
}
396+
325397
r.offloading = r.offloading[:count]
326398
r.stats.offloading.Sub(sealed.instance.Info())
327399

fracmanager/lifecycle_manager.go

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type lifecycleManager struct {
2121
provider *fractionProvider // provider for fraction operations
2222
flags *StateManager // storage state flags
2323
registry *fractionRegistry // fraction state registry
24+
tasks *TaskManager // Background offloading tasks
2425

2526
sealingWg sync.WaitGroup
2627
}
@@ -36,18 +37,26 @@ func newLifecycleManager(
3637
provider: provider,
3738
flags: flags,
3839
registry: registry,
40+
tasks: NewTaskManager(),
3941
}
4042
}
4143

4244
// Maintain performs periodic lifecycle management tasks.
4345
// It coordinates rotation, offloading, cleanup based on configuration.
44-
func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) {
45-
lc.rotate(config.FracSize, wg)
46-
if config.OffloadingEnabled {
47-
lc.offloadLocal(ctx, config.TotalSize, wg)
48-
lc.cleanRemote(config.OffloadingRetention, wg)
46+
func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
47+
48+
suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize
49+
lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold)
50+
51+
lc.rotate(cfg.FracSize, wg)
52+
if cfg.OffloadingEnabled {
53+
lc.offloadLocal(ctx, cfg.TotalSize, cfg.OffloadingRetryDelay, wg)
54+
if cfg.OffloadingQueueSize > 0 {
55+
lc.removeOverflowed(cfg.OffloadingQueueSize, wg)
56+
}
57+
lc.cleanRemote(cfg.OffloadingRetention, wg)
4958
} else {
50-
lc.cleanLocal(config.TotalSize, wg)
59+
lc.cleanLocal(cfg.TotalSize, wg)
5160
}
5261
lc.updateOldestMetric()
5362
lc.SyncInfoCache()
@@ -113,17 +122,18 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) {
113122

114123
// offloadLocal starts offloading of local fractions to remote storage.
115124
// Selects fractions based on disk space usage and retention policy.
116-
func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) {
125+
func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) {
117126
toOffload, err := lc.registry.EvictLocal(true, sizeLimit)
118127
if err != nil {
119128
logger.Fatal("error releasing old fractions:", zap.Error(err))
120129
}
121130
for _, sealed := range toOffload {
122131
wg.Add(1)
123-
go func() {
132+
lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) {
124133
defer wg.Done()
125134

126-
remote, _ := lc.tryOffload(ctx, sealed.instance)
135+
remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay)
136+
127137
lc.registry.PromoteToRemote(sealed, remote)
128138

129139
if remote == nil {
@@ -136,7 +146,41 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64,
136146
// free up local resources
137147
sealed.instance.Suicide()
138148
maintenanceTruncateTotal.Add(1)
139-
}()
149+
})
150+
}
151+
}
152+
153+
// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation.
154+
// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled.
155+
func (lc *lifecycleManager) offloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote {
156+
start := time.Now()
157+
for i := 0; ; i++ {
158+
remote, err := lc.tryOffload(ctx, sealed)
159+
if err == nil {
160+
return remote
161+
}
162+
163+
logger.Warn(
164+
"fail to offload fraction",
165+
zap.String("name", sealed.BaseFileName),
166+
zap.Duration("offloading_time", time.Since(start)),
167+
zap.Int("attempts", i),
168+
zap.Error(err),
169+
)
170+
171+
select {
172+
case <-ctx.Done():
173+
logger.Info(
174+
"fraction offloading was stopped",
175+
zap.String("name", sealed.BaseFileName),
176+
zap.Duration("offloading_time", time.Since(start)),
177+
zap.Int("attempts", i),
178+
zap.Error(ctx.Err()),
179+
)
180+
return nil
181+
case <-time.After(retryDelay):
182+
// Wait before next retry attempt
183+
}
140184
}
141185
}
142186

@@ -163,9 +207,6 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed)
163207

164208
// cleanRemote deletes outdated remote fractions based on retention policy.
165209
func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) {
166-
if retention == 0 {
167-
return
168-
}
169210
toDelete := lc.registry.EvictRemote(retention)
170211
wg.Add(1)
171212
go func() {
@@ -207,3 +248,18 @@ func (lc *lifecycleManager) updateOldestMetric() {
207248
oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds())
208249
oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds())
209250
}
251+
252+
// removeOverflowed removes fractions from offloading queue that exceed size limit
253+
// Stops ongoing offloading tasks and cleans up both local and remote resources.
254+
func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) {
255+
evicted := lc.registry.EvictOverflowed(sizeLimit)
256+
for _, item := range evicted {
257+
wg.Add(1)
258+
go func() {
259+
defer wg.Done()
260+
// Cancel the offloading task - this operation may take significant time
261+
// hence executed in a separate goroutine to avoid blocking
262+
lc.tasks.Cancel(item.instance.BaseFileName)
263+
}()
264+
}
265+
}

fracmanager/lifecycle_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestOldestMetrics(t *testing.T) {
150150
}
151151

152152
wg := sync.WaitGroup{}
153-
lc.offloadLocal(t.Context(), total-halfSize, &wg)
153+
lc.offloadLocal(t.Context(), total-halfSize, 0, &wg)
154154
wg.Wait()
155155

156156
// Check state after offloading

fracmanager/proxy_frac.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var (
2323
_ frac.Fraction = (*emptyFraction)(nil)
2424

2525
ErrFractionNotWritable = errors.New("fraction is not writable")
26+
ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded")
2627
)
2728

2829
// fractionProxy provides thread-safe access to a fraction with atomic replacement
@@ -81,6 +82,7 @@ type activeProxy struct {
8182
wg sync.WaitGroup // Tracks pending write operations
8283

8384
finalized bool // Whether fraction is frozen for writes
85+
suspended bool // Temporarily suspended for writes
8486
}
8587

8688
func newActiveProxy(active *frac.Active) *activeProxy {
@@ -97,6 +99,10 @@ func (p *activeProxy) Append(docs, meta []byte) error {
9799
p.mu.RUnlock()
98100
return ErrFractionNotWritable
99101
}
102+
if p.suspended {
103+
p.mu.RUnlock()
104+
return ErrFractionSuspended
105+
}
100106
p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle()
101107
p.mu.RUnlock()
102108

@@ -115,6 +121,19 @@ func (p *activeProxy) WaitWriteIdle() {
115121
zap.Float64("time_wait_s", waitTime))
116122
}
117123

124+
func (p *activeProxy) Suspended() bool {
125+
p.mu.Lock()
126+
defer p.mu.Unlock()
127+
128+
return p.suspended
129+
}
130+
131+
func (p *activeProxy) Suspend(value bool) {
132+
p.mu.Lock()
133+
p.suspended = value
134+
p.mu.Unlock()
135+
}
136+
118137
// Finalize marks the fraction as read-only and prevents new writes from starting after finalize.
119138
func (p *activeProxy) Finalize() error {
120139
p.mu.Lock()

0 commit comments

Comments
 (0)