Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions pkg/replication/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,17 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli
zerolog.Ctx(ctx).Debug().Msg("creating replication task")

// 1. find repl rule(-s)
incVersions, replications, err := s.getDestinations(ctx, routedTo)
replications, skipTasks, err := s.getDestinations(ctx, routedTo)
if err != nil {
if errors.Is(err, dom.ErrPolicy) {
zerolog.Ctx(ctx).Info().Err(err).Msg("skip Replicate: replication is not configured")
return nil
}
return err
}
if !incVersions {
if len(replications) == 0 {
zerolog.Ctx(ctx).Info().Err(err).Msg("skip Replicate: replication is not configured")
return nil
} else {
// if we replicate, we have to increment source versions
// should never happen: sanity check for logic error
return fmt.Errorf("%w: replication destinations found but not incrementing source versions", dom.ErrInternal)
}

if len(replications) == 0 {
zerolog.Ctx(ctx).Info().Msg("skip Replicate: replication is not configured")
return nil
}
destination := meta.Destination{
Storage: routedTo,
Expand All @@ -87,7 +80,13 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli
case *tasks.BucketCreatePayload:
// no version increment needed
case *tasks.BucketDeletePayload:
err = s.versionSvc.DeleteBucketAll(ctx, replID, t.Bucket)
// During zero-downtime switch use Increment instead of DeleteAll
// to keep version non-empty so old events see From <= To and skip.
if skipTasks {
_, err = s.versionSvc.IncrementBucket(ctx, replID, t.Bucket, destination)
} else {
err = s.versionSvc.DeleteBucketAll(ctx, replID, t.Bucket)
}
if err != nil {
return err
}
Expand All @@ -102,7 +101,9 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli
return err
}
case *tasks.ObjectSyncPayload:
if t.Deleted {
// During zero-downtime switch use Increment even for deletes
// to keep version non-empty so old events see From <= To and skip.
if t.Deleted && !skipTasks {
err = s.versionSvc.DeleteObjAll(ctx, replID, t.Object)
} else {
_, err = s.versionSvc.IncrementObj(ctx, replID, t.Object, destination)
Expand All @@ -124,32 +125,31 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli
return fmt.Errorf("%w: unsupported replication task type %T", dom.ErrInternal, task)
}
// 3. fan out tasks for each destination
task.SetReplicationID(replID)
err := s.queueSvc.EnqueueTask(ctx, task)
if err != nil {
return fmt.Errorf("unable to fan out replication task to %+v: %w", replID, err)
if !skipTasks {
task.SetReplicationID(replID)
err := s.queueSvc.EnqueueTask(ctx, task)
if err != nil {
return fmt.Errorf("unable to fan out replication task to %+v: %w", replID, err)
}
}
}

return nil
}

func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (incSouceVersions bool, replicateTo []entity.UniversalReplicationID, err error) {
// getDestinations returns replication targets for the current request.
// skipTasks=true means: increment versions but do not enqueue replication tasks.
// This is used during zero-downtime switch to prevent old pre-switch events from
// overwriting post-switch data.
func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (replicateTo []entity.UniversalReplicationID, skipTasks bool, err error) {
destinations := xctx.GetReplications(ctx)
if len(destinations) != 0 {
// normal flow increment source version and replicate to all followers
return true, destinations, nil
return destinations, false, nil
}
// no destinations means only 3 things:
// 1. bucket replication is not configured and user replication is not configured - no need to do anything
// 2. bucket replication is archived because zero-downtime switch is in progress
// - increment source version only if request routed to main storage
// - replicate only for dangling multipart uploads to old main storage

inProgressZeroDowntimeSwitch := xctx.GetInProgressZeroDowntime(ctx)
if inProgressZeroDowntimeSwitch == nil {
// do nothing - replication was not configured
return false, nil, nil
return nil, false, nil
}
originalReplID := inProgressZeroDowntimeSwitch.ReplicationID()

Expand Down Expand Up @@ -177,12 +177,10 @@ func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (incSouceV
// PROBLEM:
// - because replication policy is archived we dont create replication tasks
// - but we need to finish old multipart upload on A and replicate completed object to B
return true, []entity.UniversalReplicationID{
originalReplID,
}, nil
return []entity.UniversalReplicationID{originalReplID}, false, nil
}

// zero-downtime switch in progress.
// increment version in routed storage and skip creating replication tasks
return true, nil, nil
return []entity.UniversalReplicationID{originalReplID}, true, nil
}
8 changes: 6 additions & 2 deletions service/standalone/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ proxy:
# - accessKeyID: <s3 v4 accessKey credential>
# secretAccessKey: <s3 v4 secretKey credential>
worker:
pauseRetryInterval: 5s
switchRetryInterval: 5s
queueUpdateInterval: 3s
swiftRetryInterval: 1m # used for swift task retries caused by swift inconsistent state
pauseRetryInterval: 30s
switchRetryInterval: 30s
taskCheckInterval: 1s
delayedTaskCheckInterval: 3s
6 changes: 6 additions & 0 deletions service/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (c *Config) Validate() error {
if err := c.Storage.Validate(); err != nil {
return err
}
if c.Worker == nil {
return fmt.Errorf("worker config: empty Worker config")
}
if err := c.Worker.Validate(); err != nil {
return fmt.Errorf("worker config: invalid Worker config: %w", err)
}
if c.Concurrency <= 0 {
return fmt.Errorf("worker config: concurency config must be positive: %d", c.Concurrency)
}
Expand Down
4 changes: 3 additions & 1 deletion service/worker/config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
concurrency: 10 # max number of task that can be processed in parallel
shutdownTimeout: 5s
worker:
queueUpdateInterval: 5s
queueUpdateInterval: 3s
swiftRetryInterval: 1m # used for swift task retries caused by swift inconsistent state
pauseRetryInterval: 30s
switchRetryInterval: 30s
taskCheckInterval: 1s
delayedTaskCheckInterval: 3s
lock:
overlap: 2s
api:
Expand Down
33 changes: 29 additions & 4 deletions service/worker/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,35 @@ import (
)

type Config struct {
SwiftRetryInterval time.Duration `yaml:"swiftRetryInterval"`
PauseRetryInterval time.Duration `yaml:"pauseRetryInterval"`
SwitchRetryInterval time.Duration `yaml:"switchRetryInterval"`
QueueUpdateInterval time.Duration `yaml:"queueUpdateInterval"`
SwiftRetryInterval time.Duration `yaml:"swiftRetryInterval"`
PauseRetryInterval time.Duration `yaml:"pauseRetryInterval"`
SwitchRetryInterval time.Duration `yaml:"switchRetryInterval"`
QueueUpdateInterval time.Duration `yaml:"queueUpdateInterval"`
TaskCheckInterval time.Duration `yaml:"taskCheckInterval"`
DelayedTaskCheckInterval time.Duration `yaml:"delayedTaskCheckInterval"`
CustomErrRetryInterval *time.Duration `yaml:"customErrRetryInterval,omitempty"`
}

func (c *Config) Validate() error {
if c.SwiftRetryInterval <= 0 {
return fmt.Errorf("swiftRetryInterval must be greater than 0")
}
if c.PauseRetryInterval <= 0 {
return fmt.Errorf("pauseRetryInterval must be greater than 0")
}
if c.SwitchRetryInterval <= 0 {
return fmt.Errorf("switchRetryInterval must be greater than 0")
}
if c.QueueUpdateInterval <= 0 {
return fmt.Errorf("queueUpdateInterval must be greater than 0")
}
if c.TaskCheckInterval <= 0 {
return fmt.Errorf("taskCheckInterval must be greater than 0")
}
if c.DelayedTaskCheckInterval <= 0 {
return fmt.Errorf("delayedTaskCheckInterval must be greater than 0")
}
return nil
}

type svc struct {
Expand Down
27 changes: 19 additions & 8 deletions service/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error {
stdLogger := log.NewStdLogger()
redis.SetLogger(stdLogger)

defaultRetry := fallbackRetryDelay(conf.Worker.CustomErrRetryInterval)
srv := asynq.NewServer(
queueRedis,
asynq.Config{
Expand All @@ -149,7 +150,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error {
var rlErr *dom.ErrRateLimitExceeded
return !errors.As(err, &rlErr)
},
RetryDelayFunc: retryDelay,
RetryDelayFunc: retryDelayFunc(defaultRetry),
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
retried, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
Expand All @@ -173,6 +174,8 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error {
StrictPriority: true,
DynamicQueues: true,
DynamicQueueUpdateInterval: conf.Worker.QueueUpdateInterval,
TaskCheckInterval: conf.Worker.TaskCheckInterval,
DelayedTaskCheckInterval: conf.Worker.DelayedTaskCheckInterval,
},
)

Expand Down Expand Up @@ -302,13 +305,21 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error {
return server.Start(ctx)
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
var rlErr *dom.ErrRateLimitExceeded
if errors.As(err, &rlErr) {
return rlErr.RetryIn
func retryDelayFunc(fallback asynq.RetryDelayFunc) asynq.RetryDelayFunc {
return func(n int, err error, task *asynq.Task) time.Duration {
var rlErr *dom.ErrRateLimitExceeded
if errors.As(err, &rlErr) {
return rlErr.RetryIn
}
return fallback(n, err, task)
}
return ErrRetryDelayFunc(n, err, task)
}

// override in e2e test for short retry
var ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc
func fallbackRetryDelay(customInterval *time.Duration) asynq.RetryDelayFunc {
if customInterval == nil || *customInterval <= 0 {
return asynq.DefaultRetryDelayFunc
}
return func(_ int, _ error, _ *asynq.Task) time.Duration {
return *customInterval
}
}
24 changes: 16 additions & 8 deletions test/app/chorus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,26 @@ import (
pb "github.com/clyso/chorus/proto/gen/go/chorus"
"github.com/clyso/chorus/service/proxy"
"github.com/clyso/chorus/service/worker"
"github.com/hibiken/asynq"
"github.com/clyso/chorus/service/worker/handler"
"github.com/rs/xid"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

var (
asynqRetry = 100 * time.Millisecond
)

var WorkerRetryConf = handler.Config{
SwiftRetryInterval: asynqRetry * 5,
PauseRetryInterval: asynqRetry * 2,
SwitchRetryInterval: asynqRetry * 2,
QueueUpdateInterval: asynqRetry * 2,
TaskCheckInterval: asynqRetry,
DelayedTaskCheckInterval: asynqRetry * 2,
CustomErrRetryInterval: &asynqRetry,
}

type Chorus struct {
PolicyClient pb.PolicyClient
ChorusClient pb.ChorusClient
Expand All @@ -56,20 +70,14 @@ func SetupChorus(t testing.TB, workerConf *worker.Config, proxyConf *proxy.Confi
if err != nil {
t.Fatal(err)
}
workerConf.Worker = &WorkerRetryConf
e := Chorus{
WaitShort: waitShort,
RetryShort: retryShort,
WaitLong: waitLong,
RetryLong: retryLong,
}

worker.ErrRetryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration {
return retryLong
}
t.Cleanup(func() {
worker.ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc
})

redisAddr := testutil.SetupRedisAddr(t)

proxyConf.Redis.Address = redisAddr
Expand Down
11 changes: 3 additions & 8 deletions test/app/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ import (
const (
user = "test"

retryShort = 100 * time.Millisecond
retryLong = 500 * time.Millisecond
retryShort = 20 * time.Millisecond
retryLong = 50 * time.Millisecond
)

var (
Expand Down Expand Up @@ -139,12 +139,7 @@ func SetupEmbedded(t testing.TB, workerConf *worker.Config, proxyConf *proxy.Con
WaitLong: waitLong,
RetryLong: retryLong,
}
worker.ErrRetryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration {
return retryLong
}
t.Cleanup(func() {
worker.ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc
})
workerConf.Worker = &WorkerRetryConf

redisAddr := testutil.SetupRedisAddr(t)

Expand Down
6 changes: 1 addition & 5 deletions test/diff/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/clyso/chorus/pkg/trace"
pb "github.com/clyso/chorus/proto/gen/go/chorus"
"github.com/clyso/chorus/service/worker"
"github.com/clyso/chorus/service/worker/handler"
"github.com/clyso/chorus/test/app"
"github.com/clyso/chorus/test/env"
"github.com/clyso/chorus/test/gen"
Expand Down Expand Up @@ -432,10 +431,7 @@ func newWorkerConfig(objStoreConfig *objstore.Config, redisConfig *config.Redis,
Lock: &worker.Lock{
Overlap: time.Second,
},
Worker: &handler.Config{
SwitchRetryInterval: time.Millisecond * 500,
PauseRetryInterval: time.Millisecond * 500,
},
Worker: &app.WorkerRetryConf,
Storage: *objStoreConfig,
Api: &api.Config{
Enabled: true,
Expand Down
Loading