Skip to content

Commit 9c8e96b

Browse files
committed
feat(fracmanager) add handling for upload queue overflow and disk space exhaustion
1 parent e956ab5 commit 9c8e96b

File tree

8 files changed

+251
-19
lines changed

8 files changed

+251
-19
lines changed

cmd/seq-db/seq-db.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ func startStore(
253253
DataDir: cfg.Storage.DataDir,
254254
FracSize: uint64(cfg.Storage.FracSize),
255255
TotalSize: uint64(cfg.Storage.TotalSize),
256+
SealingQueueLen: uint64(cfg.Storage.SealingQueueLen),
256257
CacheSize: uint64(cfg.Resources.CacheSize),
257258
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
258259
ReplayWorkers: cfg.Resources.ReplayWorkers,
@@ -280,8 +281,10 @@ func startStore(
280281
SkipSortDocs: !cfg.DocsSorting.Enabled,
281282
KeepMetaFile: false,
282283
},
283-
OffloadingEnabled: cfg.Offloading.Enabled,
284-
OffloadingRetention: cfg.Offloading.Retention,
284+
OffloadingEnabled: cfg.Offloading.Enabled,
285+
OffloadingRetention: cfg.Offloading.Retention,
286+
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
287+
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
285288
},
286289
API: storeapi.APIConfig{
287290
StoreMode: configMode,

config/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type Config struct {
6363
// TotalSize specifies upper bound of how much disk space can be occupied
6464
// by sealed fractions before they get deleted (or offloaded).
6565
TotalSize Bytes `config:"total_size" default:"1GiB"`
66+
// SealingQueueLen defines the maximum length of the sealing queue.
67+
// If the queue size exceeds this limit, writing to the store will be paused,
68+
// and bulk requests will start returning errors.
69+
// A value of zero disables this limit, allowing writes to proceed unconditionally.
70+
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
6671
} `config:"storage"`
6772

6873
Cluster struct {
@@ -234,8 +239,13 @@ type Config struct {
234239
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
235240
SecretKey string `config:"secret_key"`
236241
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
237-
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
238242
RetryCount int `config:"retry_count" default:"5"`
243+
// Specifies the percentage of total local dataset size allocated to the offloading queue.
244+
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
245+
// This automatic removal is disabled when set to zero.
246+
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
247+
// Delay duration between consecutive offloading retries
248+
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
239249
} `config:"offloading"`
240250

241251
AsyncSearch struct {

fracmanager/config.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Config struct {
1919
TotalSize uint64
2020
CacheSize uint64
2121

22+
SuspendThreshold uint64
23+
SealingQueueLen uint64
24+
2225
ReplayWorkers int
2326
MaintenanceDelay time.Duration
2427
CacheCleanupDelay time.Duration
@@ -28,8 +31,10 @@ type Config struct {
2831
Fraction frac.Config
2932
MinSealFracSize uint64
3033

31-
OffloadingEnabled bool
32-
OffloadingRetention time.Duration
34+
OffloadingEnabled bool
35+
OffloadingQueueSize uint64
36+
OffloadingRetention time.Duration
37+
OffloadingRetryDelay time.Duration
3338
}
3439

3540
func FillConfigWithDefault(config *Config) *Config {

fracmanager/fraction_registry.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
133133
curInfo := old.instance.Info()
134134
r.stats.sealing.Add(curInfo)
135135

136+
r.active.Suspend(old.Suspended())
137+
136138
wg := sync.WaitGroup{}
137139
wg.Add(1)
138140
// since old.WaitWriteIdle() can take some time, we don't want to do it under the lock
@@ -155,6 +157,31 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
155157
return old, wg.Wait, nil
156158
}
157159

160+
func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) bool {
161+
r.mu.Lock()
162+
defer r.mu.Unlock()
163+
164+
if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) {
165+
r.active.Suspend(true)
166+
return true
167+
}
168+
169+
if maxSize > 0 && r.diskUsage() > maxSize {
170+
r.active.Suspend(true)
171+
return true
172+
}
173+
174+
r.active.Suspend(false)
175+
return false
176+
}
177+
178+
func (r *fractionRegistry) diskUsage() uint64 {
179+
return r.active.instance.Info().FullSize() +
180+
r.stats.locals.totalSizeOnDisk +
181+
r.stats.sealing.totalSizeOnDisk +
182+
r.stats.offloading.totalSizeOnDisk
183+
}
184+
158185
// addActive sets a new active fraction and updates the complete fractions list.
159186
func (r *fractionRegistry) addActive(a *activeProxy) {
160187
r.muAll.Lock()
@@ -231,6 +258,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*
231258
// Fractions older than retention period are permanently deleted.
232259
// Returns removed fractions or empty slice if nothing to remove.
233260
func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
261+
if retention == 0 {
262+
return nil
263+
}
264+
234265
r.mu.Lock()
235266
defer r.mu.Unlock()
236267

@@ -252,6 +283,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
252283
return evicted
253284
}
254285

286+
// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit.
287+
// Selects fractions that haven't finished offloading yet to minimize data loss.
288+
// Used when offloading queue grows too large due to slow remote storage performance.
289+
func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy {
290+
if sizeLimit == 0 {
291+
return nil
292+
}
293+
294+
r.mu.Lock()
295+
defer r.mu.Unlock()
296+
297+
// Fast path: skip processing if within size limits
298+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit {
299+
return nil
300+
}
301+
302+
count := 0
303+
evicted := []*sealedProxy{}
304+
// filter fractions
305+
for _, item := range r.offloading {
306+
// keep items that are within limits or already offloaded
307+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil {
308+
r.offloading[count] = item
309+
count++
310+
continue
311+
}
312+
evicted = append(evicted, item)
313+
r.stats.offloading.Sub(item.instance.Info())
314+
}
315+
316+
r.offloading = r.offloading[:count]
317+
r.rebuildAllFractions()
318+
319+
return evicted
320+
}
321+
255322
// PromoteToLocal moves fractions from sealing to local queue when sealing completes.
256323
// Maintains strict ordering - younger fractions wait for older ones to seal first.
257324
func (r *fractionRegistry) PromoteToLocal(active *activeProxy, sealed *frac.Sealed) {
@@ -326,6 +393,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
326393
count++
327394
}
328395
}
396+
397+
if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed)
398+
return
399+
}
400+
329401
r.offloading = r.offloading[:count]
330402
r.stats.offloading.Sub(sealed.instance.Info())
331403

fracmanager/lifecycle_manager.go

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type lifecycleManager struct {
2121
provider *fractionProvider // Provider for fraction operations
2222
flags *StateManager // Storage state flags
2323
registry *fractionRegistry // Fraction state registry
24+
tasks *TaskManager // Background offloading tasks
2425

2526
sealingWg sync.WaitGroup
2627
}
@@ -36,19 +37,27 @@ func newLifecycleManager(
3637
provider: provider,
3738
flags: flags,
3839
registry: registry,
40+
tasks: NewTaskManager(),
3941
}
4042
}
4143

4244
// Maintain performs periodic lifecycle management tasks.
4345
// It is a CORE method of lifecycleManager
4446
// Coordinates rotation, offloading, cleanup based on configuration.
45-
func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) {
46-
lc.Rotate(config.FracSize, wg)
47-
if config.OffloadingEnabled {
48-
lc.OffloadLocal(ctx, config.TotalSize, wg)
49-
lc.CleanRemote(config.OffloadingRetention, wg)
47+
func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
48+
49+
suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize
50+
lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold)
51+
52+
lc.Rotate(cfg.FracSize, wg)
53+
if cfg.OffloadingEnabled {
54+
lc.OffloadLocal(ctx, cfg.TotalSize, cfg.OffloadingRetryDelay, wg)
55+
if cfg.OffloadingQueueSize > 0 {
56+
lc.RemoveOverflowed(cfg.OffloadingQueueSize, wg)
57+
}
58+
lc.CleanRemote(cfg.OffloadingRetention, wg)
5059
} else {
51-
lc.CleanLocal(config.TotalSize, wg)
60+
lc.CleanLocal(cfg.TotalSize, wg)
5261
}
5362
lc.UpdateOldestMetric()
5463
lc.SyncInfoCache()
@@ -113,17 +122,18 @@ func (lc *lifecycleManager) Rotate(maxSize uint64, wg *sync.WaitGroup) {
113122

114123
// OffloadLocal starts offloading of local fractions to remote storage
115124
// Selects fractions based on disk space usage and retention policy.
116-
func (lc *lifecycleManager) OffloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) {
125+
func (lc *lifecycleManager) OffloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) {
117126
toOffload, err := lc.registry.EvictLocal(true, sizeLimit)
118127
if err != nil {
119128
logger.Fatal("error releasing old fractions:", zap.Error(err))
120129
}
121130
for _, sealed := range toOffload {
122131
wg.Add(1)
123-
go func() {
132+
lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) {
124133
defer wg.Done()
125134

126-
remote, _ := lc.TryOffload(ctx, sealed.instance)
135+
remote := lc.OffloadWithRetry(ctx, sealed.instance, retryDelay)
136+
127137
lc.registry.PromoteToRemote(sealed, remote)
128138

129139
if remote == nil {
@@ -136,7 +146,41 @@ func (lc *lifecycleManager) OffloadLocal(ctx context.Context, sizeLimit uint64,
136146
// Free up local resources
137147
sealed.instance.Suicide()
138148
maintenanceTruncateTotal.Add(1)
139-
}()
149+
})
150+
}
151+
}
152+
153+
// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation.
154+
// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled.
155+
func (lc *lifecycleManager) OffloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote {
156+
start := time.Now()
157+
for i := 0; ; i++ {
158+
remote, err := lc.TryOffload(ctx, sealed)
159+
if err == nil {
160+
return remote
161+
}
162+
163+
logger.Warn(
164+
"fail to offload fraction",
165+
zap.String("name", sealed.BaseFileName),
166+
zap.Duration("offloading_time", time.Since(start)),
167+
zap.Int("attempts", i),
168+
zap.Error(err),
169+
)
170+
171+
select {
172+
case <-ctx.Done():
173+
logger.Info(
174+
"fraction offloading was stopped",
175+
zap.String("name", sealed.BaseFileName),
176+
zap.Duration("offloading_time", time.Since(start)),
177+
zap.Int("attempts", i),
178+
zap.Error(ctx.Err()),
179+
)
180+
return nil
181+
case <-time.After(retryDelay):
182+
// Wait before next retry attempt
183+
}
140184
}
141185
}
142186

@@ -163,9 +207,6 @@ func (lc *lifecycleManager) TryOffload(ctx context.Context, sealed *frac.Sealed)
163207

164208
// CleanRemote deletes outdated remote fractions based on retention policy
165209
func (lc *lifecycleManager) CleanRemote(retention time.Duration, wg *sync.WaitGroup) {
166-
if retention == 0 {
167-
return
168-
}
169210
toDelete := lc.registry.EvictRemote(retention)
170211
wg.Add(1)
171212
go func() {
@@ -202,6 +243,21 @@ func (lc *lifecycleManager) CleanLocal(sizeLimit uint64, wg *sync.WaitGroup) {
202243
}()
203244
}
204245

246+
// RemoveOverflowed removes fractions from offloading queue that exceed size limit
247+
// Stops ongoing offloading tasks and cleans up both local and remote resources.
248+
func (lc *lifecycleManager) RemoveOverflowed(sizeLimit uint64, wg *sync.WaitGroup) {
249+
evicted := lc.registry.EvictOverflowed(sizeLimit)
250+
for _, item := range evicted {
251+
wg.Add(1)
252+
go func() {
253+
defer wg.Done()
254+
// Cancel the offloading task - this operation may take significant time
255+
// hence executed in a separate goroutine to avoid blocking
256+
lc.tasks.Cancel(item.instance.BaseFileName)
257+
}()
258+
}
259+
}
260+
205261
// UpdateOldestMetric updates the prometheus metric with oldest fraction timestamp
206262
func (lc *lifecycleManager) UpdateOldestMetric() {
207263
oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds())

fracmanager/lifecycle_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestOldestMetrics(t *testing.T) {
150150
}
151151

152152
wg := sync.WaitGroup{}
153-
lc.OffloadLocal(t.Context(), total-halfSize, &wg)
153+
lc.OffloadLocal(t.Context(), total-halfSize, 0, &wg)
154154
wg.Wait()
155155

156156
// Check state after offloading

fracmanager/proxy_frac.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var (
2323
_ frac.Fraction = (*emptyFraction)(nil)
2424

2525
ErrFractionNotWritable = errors.New("fraction is not writable")
26+
ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded")
2627
)
2728

2829
// fractionProxy provides thread-safe access to a fraction with atomic replacement
@@ -81,6 +82,7 @@ type activeProxy struct {
8182
wg sync.WaitGroup // Tracks pending write operations
8283

8384
finalized bool // Whether fraction is frozen for writes
85+
suspended bool // Temporarily suspended for writes
8486
}
8587

8688
func newActiveProxy(active *frac.Active) *activeProxy {
@@ -97,6 +99,10 @@ func (p *activeProxy) Append(docs, meta []byte) error {
9799
p.mu.RUnlock()
98100
return ErrFractionNotWritable
99101
}
102+
if p.suspended {
103+
p.mu.RUnlock()
104+
return ErrFractionSuspended
105+
}
100106
p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle()
101107
p.mu.RUnlock()
102108

@@ -115,6 +121,19 @@ func (p *activeProxy) WaitWriteIdle() {
115121
zap.Float64("time_wait_s", waitTime))
116122
}
117123

124+
func (p *activeProxy) Suspended() bool {
125+
p.mu.Lock()
126+
defer p.mu.Unlock()
127+
128+
return p.suspended
129+
}
130+
131+
func (p *activeProxy) Suspend(value bool) {
132+
p.mu.Lock()
133+
p.suspended = value
134+
p.mu.Unlock()
135+
}
136+
118137
// Finalize marks the fraction as read-only and prevents new writes from starting after finalize.
119138
func (p *activeProxy) Finalize() error {
120139
p.mu.Lock()

0 commit comments

Comments
 (0)