diff --git a/pkg/replication/s3.go b/pkg/replication/s3.go index a6e30fa9..98061b81 100644 --- a/pkg/replication/s3.go +++ b/pkg/replication/s3.go @@ -58,7 +58,7 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli zerolog.Ctx(ctx).Debug().Msg("creating replication task") // 1. find repl rule(-s) - incVersions, replications, err := s.getDestinations(ctx, routedTo) + replications, skipTasks, err := s.getDestinations(ctx, routedTo) if err != nil { if errors.Is(err, dom.ErrPolicy) { zerolog.Ctx(ctx).Info().Err(err).Msg("skip Replicate: replication is not configured") @@ -66,16 +66,9 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli } return err } - if !incVersions { - if len(replications) == 0 { - zerolog.Ctx(ctx).Info().Err(err).Msg("skip Replicate: replication is not configured") - return nil - } else { - // if we replicate, we have to increment source versions - // should never happen: sanity check for logic error - return fmt.Errorf("%w: replication destinations found but not incrementing source versions", dom.ErrInternal) - } - + if len(replications) == 0 { + zerolog.Ctx(ctx).Info().Msg("skip Replicate: replication is not configured") + return nil } destination := meta.Destination{ Storage: routedTo, @@ -87,7 +80,13 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli case *tasks.BucketCreatePayload: // no version increment needed case *tasks.BucketDeletePayload: - err = s.versionSvc.DeleteBucketAll(ctx, replID, t.Bucket) + // During zero-downtime switch use Increment instead of DeleteAll + // to keep version non-empty so old events see From <= To and skip. + if skipTasks { + _, err = s.versionSvc.IncrementBucket(ctx, replID, t.Bucket, destination) + } else { + err = s.versionSvc.DeleteBucketAll(ctx, replID, t.Bucket) + } if err != nil { return err } @@ -102,7 +101,9 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli return err } case *tasks.ObjectSyncPayload: - if t.Deleted { + // During zero-downtime switch use Increment even for deletes + // to keep version non-empty so old events see From <= To and skip. + if t.Deleted && !skipTasks { err = s.versionSvc.DeleteObjAll(ctx, replID, t.Object) } else { _, err = s.versionSvc.IncrementObj(ctx, replID, t.Object, destination) @@ -124,32 +125,31 @@ func (s *s3Svc) Replicate(ctx context.Context, routedTo string, task tasks.Repli return fmt.Errorf("%w: unsupported replication task type %T", dom.ErrInternal, task) } // 3. fan out tasks for each destination - task.SetReplicationID(replID) - err := s.queueSvc.EnqueueTask(ctx, task) - if err != nil { - return fmt.Errorf("unable to fan out replication task to %+v: %w", replID, err) + if !skipTasks { + task.SetReplicationID(replID) + err := s.queueSvc.EnqueueTask(ctx, task) + if err != nil { + return fmt.Errorf("unable to fan out replication task to %+v: %w", replID, err) + } } } return nil } -func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (incSouceVersions bool, replicateTo []entity.UniversalReplicationID, err error) { +// getDestinations returns replication targets for the current request. +// skipTasks=true means: increment versions but do not enqueue replication tasks. +// This is used during zero-downtime switch to prevent old pre-switch events from +// overwriting post-switch data. +func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (replicateTo []entity.UniversalReplicationID, skipTasks bool, err error) { destinations := xctx.GetReplications(ctx) if len(destinations) != 0 { - // normal flow increment source version and replicate to all followers - return true, destinations, nil + return destinations, false, nil } - // no destinations means only 3 things: - // 1. bucket replication is not configured and user replication is not configured - no need to do anything - // 2. bucket replication is archived because zero-downtime switch is in progress - // - increment source version only if request routed to main storage - // - replicate only for dangling multipart uploads to old main storage inProgressZeroDowntimeSwitch := xctx.GetInProgressZeroDowntime(ctx) if inProgressZeroDowntimeSwitch == nil { - // do nothing - replication was not configured - return false, nil, nil + return nil, false, nil } originalReplID := inProgressZeroDowntimeSwitch.ReplicationID() @@ -177,12 +177,10 @@ func (s *s3Svc) getDestinations(ctx context.Context, routedTo string) (incSouceV // PROBLEM: // - because replication policy is archived we dont create replication tasks // - but we need to finish old multipart upload on A and replicate completed object to B - return true, []entity.UniversalReplicationID{ - originalReplID, - }, nil + return []entity.UniversalReplicationID{originalReplID}, false, nil } // zero-downtime switch in progress. // increment version in routed storage and skip creating replication tasks - return true, nil, nil + return []entity.UniversalReplicationID{originalReplID}, true, nil } diff --git a/service/standalone/config.yaml b/service/standalone/config.yaml index abd0d8b4..e4474c2b 100644 --- a/service/standalone/config.yaml +++ b/service/standalone/config.yaml @@ -29,5 +29,9 @@ proxy: # - accessKeyID: # secretAccessKey: worker: - pauseRetryInterval: 5s - switchRetryInterval: 5s + queueUpdateInterval: 3s + swiftRetryInterval: 1m # used for swift task retries caused by swift inconsistent state + pauseRetryInterval: 30s + switchRetryInterval: 30s + taskCheckInterval: 1s + delayedTaskCheckInterval: 3s diff --git a/service/worker/config.go b/service/worker/config.go index 422be5e6..c2a3d891 100644 --- a/service/worker/config.go +++ b/service/worker/config.go @@ -62,6 +62,12 @@ func (c *Config) Validate() error { if err := c.Storage.Validate(); err != nil { return err } + if c.Worker == nil { + return fmt.Errorf("worker config: empty Worker config") + } + if err := c.Worker.Validate(); err != nil { + return fmt.Errorf("worker config: invalid Worker config: %w", err) + } if c.Concurrency <= 0 { return fmt.Errorf("worker config: concurency config must be positive: %d", c.Concurrency) } diff --git a/service/worker/config.yaml b/service/worker/config.yaml index db1c1e9a..aa9210cd 100644 --- a/service/worker/config.yaml +++ b/service/worker/config.yaml @@ -1,10 +1,12 @@ concurrency: 10 # max number of task that can be processed in parallel shutdownTimeout: 5s worker: - queueUpdateInterval: 5s + queueUpdateInterval: 3s swiftRetryInterval: 1m # used for swift task retries caused by swift inconsistent state pauseRetryInterval: 30s switchRetryInterval: 30s + taskCheckInterval: 1s + delayedTaskCheckInterval: 3s lock: overlap: 2s api: diff --git a/service/worker/handler/service.go b/service/worker/handler/service.go index 83c04f0a..4f354fe1 100644 --- a/service/worker/handler/service.go +++ b/service/worker/handler/service.go @@ -35,10 +35,35 @@ import ( ) type Config struct { - SwiftRetryInterval time.Duration `yaml:"swiftRetryInterval"` - PauseRetryInterval time.Duration `yaml:"pauseRetryInterval"` - SwitchRetryInterval time.Duration `yaml:"switchRetryInterval"` - QueueUpdateInterval time.Duration `yaml:"queueUpdateInterval"` + SwiftRetryInterval time.Duration `yaml:"swiftRetryInterval"` + PauseRetryInterval time.Duration `yaml:"pauseRetryInterval"` + SwitchRetryInterval time.Duration `yaml:"switchRetryInterval"` + QueueUpdateInterval time.Duration `yaml:"queueUpdateInterval"` + TaskCheckInterval time.Duration `yaml:"taskCheckInterval"` + DelayedTaskCheckInterval time.Duration `yaml:"delayedTaskCheckInterval"` + CustomErrRetryInterval *time.Duration `yaml:"customErrRetryInterval,omitempty"` +} + +func (c *Config) Validate() error { + if c.SwiftRetryInterval <= 0 { + return fmt.Errorf("swiftRetryInterval must be greater than 0") + } + if c.PauseRetryInterval <= 0 { + return fmt.Errorf("pauseRetryInterval must be greater than 0") + } + if c.SwitchRetryInterval <= 0 { + return fmt.Errorf("switchRetryInterval must be greater than 0") + } + if c.QueueUpdateInterval <= 0 { + return fmt.Errorf("queueUpdateInterval must be greater than 0") + } + if c.TaskCheckInterval <= 0 { + return fmt.Errorf("taskCheckInterval must be greater than 0") + } + if c.DelayedTaskCheckInterval <= 0 { + return fmt.Errorf("delayedTaskCheckInterval must be greater than 0") + } + return nil } type svc struct { diff --git a/service/worker/server.go b/service/worker/server.go index 66ad4bef..1792a8b5 100644 --- a/service/worker/server.go +++ b/service/worker/server.go @@ -140,6 +140,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { stdLogger := log.NewStdLogger() redis.SetLogger(stdLogger) + defaultRetry := fallbackRetryDelay(conf.Worker.CustomErrRetryInterval) srv := asynq.NewServer( queueRedis, asynq.Config{ @@ -149,7 +150,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { var rlErr *dom.ErrRateLimitExceeded return !errors.As(err, &rlErr) }, - RetryDelayFunc: retryDelay, + RetryDelayFunc: retryDelayFunc(defaultRetry), ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { retried, _ := asynq.GetRetryCount(ctx) maxRetry, _ := asynq.GetMaxRetry(ctx) @@ -173,6 +174,8 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { StrictPriority: true, DynamicQueues: true, DynamicQueueUpdateInterval: conf.Worker.QueueUpdateInterval, + TaskCheckInterval: conf.Worker.TaskCheckInterval, + DelayedTaskCheckInterval: conf.Worker.DelayedTaskCheckInterval, }, ) @@ -302,13 +305,21 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { return server.Start(ctx) } -func retryDelay(n int, err error, task *asynq.Task) time.Duration { - var rlErr *dom.ErrRateLimitExceeded - if errors.As(err, &rlErr) { - return rlErr.RetryIn +func retryDelayFunc(fallback asynq.RetryDelayFunc) asynq.RetryDelayFunc { + return func(n int, err error, task *asynq.Task) time.Duration { + var rlErr *dom.ErrRateLimitExceeded + if errors.As(err, &rlErr) { + return rlErr.RetryIn + } + return fallback(n, err, task) } - return ErrRetryDelayFunc(n, err, task) } -// override in e2e test for short retry -var ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc +func fallbackRetryDelay(customInterval *time.Duration) asynq.RetryDelayFunc { + if customInterval == nil || *customInterval <= 0 { + return asynq.DefaultRetryDelayFunc + } + return func(_ int, _ error, _ *asynq.Task) time.Duration { + return *customInterval + } +} diff --git a/test/app/chorus.go b/test/app/chorus.go index 49cdbd19..98535e7a 100644 --- a/test/app/chorus.go +++ b/test/app/chorus.go @@ -25,12 +25,26 @@ import ( pb "github.com/clyso/chorus/proto/gen/go/chorus" "github.com/clyso/chorus/service/proxy" "github.com/clyso/chorus/service/worker" - "github.com/hibiken/asynq" + "github.com/clyso/chorus/service/worker/handler" "github.com/rs/xid" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) +var ( + asynqRetry = 100 * time.Millisecond +) + +var WorkerRetryConf = handler.Config{ + SwiftRetryInterval: asynqRetry * 5, + PauseRetryInterval: asynqRetry * 2, + SwitchRetryInterval: asynqRetry * 2, + QueueUpdateInterval: asynqRetry * 2, + TaskCheckInterval: asynqRetry, + DelayedTaskCheckInterval: asynqRetry * 2, + CustomErrRetryInterval: &asynqRetry, +} + type Chorus struct { PolicyClient pb.PolicyClient ChorusClient pb.ChorusClient @@ -56,6 +70,7 @@ func SetupChorus(t testing.TB, workerConf *worker.Config, proxyConf *proxy.Confi if err != nil { t.Fatal(err) } + workerConf.Worker = &WorkerRetryConf e := Chorus{ WaitShort: waitShort, RetryShort: retryShort, @@ -63,13 +78,6 @@ func SetupChorus(t testing.TB, workerConf *worker.Config, proxyConf *proxy.Confi RetryLong: retryLong, } - worker.ErrRetryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration { - return retryLong - } - t.Cleanup(func() { - worker.ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc - }) - redisAddr := testutil.SetupRedisAddr(t) proxyConf.Redis.Address = redisAddr diff --git a/test/app/embedded.go b/test/app/embedded.go index f900ceaa..b60e7193 100644 --- a/test/app/embedded.go +++ b/test/app/embedded.go @@ -59,8 +59,8 @@ import ( const ( user = "test" - retryShort = 100 * time.Millisecond - retryLong = 500 * time.Millisecond + retryShort = 20 * time.Millisecond + retryLong = 50 * time.Millisecond ) var ( @@ -139,12 +139,7 @@ func SetupEmbedded(t testing.TB, workerConf *worker.Config, proxyConf *proxy.Con WaitLong: waitLong, RetryLong: retryLong, } - worker.ErrRetryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration { - return retryLong - } - t.Cleanup(func() { - worker.ErrRetryDelayFunc = asynq.DefaultRetryDelayFunc - }) + workerConf.Worker = &WorkerRetryConf redisAddr := testutil.SetupRedisAddr(t) diff --git a/test/diff/suite_test.go b/test/diff/suite_test.go index 1b967e73..b1a2ec68 100644 --- a/test/diff/suite_test.go +++ b/test/diff/suite_test.go @@ -24,7 +24,6 @@ import ( "github.com/clyso/chorus/pkg/trace" pb "github.com/clyso/chorus/proto/gen/go/chorus" "github.com/clyso/chorus/service/worker" - "github.com/clyso/chorus/service/worker/handler" "github.com/clyso/chorus/test/app" "github.com/clyso/chorus/test/env" "github.com/clyso/chorus/test/gen" @@ -432,10 +431,7 @@ func newWorkerConfig(objStoreConfig *objstore.Config, redisConfig *config.Redis, Lock: &worker.Lock{ Overlap: time.Second, }, - Worker: &handler.Config{ - SwitchRetryInterval: time.Millisecond * 500, - PauseRetryInterval: time.Millisecond * 500, - }, + Worker: &app.WorkerRetryConf, Storage: *objStoreConfig, Api: &api.Config{ Enabled: true, diff --git a/test/env/components.go b/test/env/components.go index 9557613d..81bb59c2 100644 --- a/test/env/components.go +++ b/test/env/components.go @@ -21,10 +21,12 @@ import ( "errors" "fmt" "io" + "net/http" "net/http/httptest" "net/url" "runtime" "strconv" + "strings" "text/template" "time" @@ -47,6 +49,15 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) +func init() { + // Disable HTTP keep-alive for the default transport. + // Keystone's uwsgi http-socket mode closes connections between requests, + // causing EOF errors when Go's HTTP client reuses pooled connections. + if t, ok := http.DefaultTransport.(*http.Transport); ok { + t.DisableKeepAlives = true + } +} + const ( CRedisImage = "redis:8.4.0-alpine" CRedisPort = 6379 @@ -678,23 +689,29 @@ func startSwiftInstance(ctx context.Context, env *TestEnvironment, componentName return fmt.Errorf("unable to get swift api forwarded port: %w", err) } - _, err = endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ - Name: CKeystoneSwiftEndpointName, - Availability: gophercloud.AvailabilityInternal, - URL: fmt.Sprintf(CKeystoneSwiftEndpointURLTemplate, containerHost, forwardedPort.Int()), - ServiceID: swiftService.ID, - }).Extract() - if err != nil { + closeIdleConns() + + if err := retryOnTransient(ctx, 3, time.Second, func() error { + _, err := endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ + Name: CKeystoneSwiftEndpointName, + Availability: gophercloud.AvailabilityInternal, + URL: fmt.Sprintf(CKeystoneSwiftEndpointURLTemplate, containerHost, forwardedPort.Int()), + ServiceID: swiftService.ID, + }).Extract() + return err + }); err != nil { return fmt.Errorf("unable to create internal swift endpoint: %w", err) } - _, err = endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ - Name: CKeystoneSwiftEndpointName, - Availability: gophercloud.AvailabilityPublic, - URL: fmt.Sprintf(CKeystoneSwiftEndpointURLTemplate, containerHost, forwardedPort.Int()), - ServiceID: swiftService.ID, - }).Extract() - if err != nil { + if err := retryOnTransient(ctx, 3, time.Second, func() error { + _, err := endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ + Name: CKeystoneSwiftEndpointName, + Availability: gophercloud.AvailabilityPublic, + URL: fmt.Sprintf(CKeystoneSwiftEndpointURLTemplate, containerHost, forwardedPort.Int()), + ServiceID: swiftService.ID, + }).Extract() + return err + }); err != nil { return fmt.Errorf("unable to create public swift endpoint: %w", err) } @@ -847,9 +864,12 @@ func startRedisInstance(ctx context.Context, env *TestEnvironment, componentName natPortString := fmt.Sprintf(CNATPortTemplate, CRedisPort) natPort := nat.Port(natPortString) req := testcontainers.ContainerRequest{ - Image: CRedisImage, - Cmd: []string{"redis-server", "--save", "\"\"", "--appendonly", "no", "--requirepass", CRedisPassword}, - WaitingFor: wait.ForExec([]string{"redis-cli", "ping"}), + Image: CRedisImage, + Cmd: []string{"redis-server", "--save", "\"\"", "--appendonly", "no", "--requirepass", CRedisPassword}, + WaitingFor: wait.ForAll( + wait.ForExec([]string{"redis-cli", "-a", CRedisPassword, "ping"}), + wait.ForListeningPort(natPort), + ), HostConfigModifier: func(hc *container.HostConfig) { hc.AutoRemove = true }, @@ -1225,33 +1245,43 @@ func startCephInstanceWithKeystone(ctx context.Context, env *TestEnvironment, co return fmt.Errorf("unable to get ceph api forwarded port: %w", err) } - cephService, err := services.Create(ctx, identityClient, services.CreateOpts{ - Type: CKeystoneObjectStoreServiceType, - Extra: map[string]any{ - "name": CKeystoneCephServiceName, - }, - }).Extract() - if err != nil { + closeIdleConns() + + var cephService *services.Service + if err := retryOnTransient(ctx, 3, time.Second, func() error { + var err error + cephService, err = services.Create(ctx, identityClient, services.CreateOpts{ + Type: CKeystoneObjectStoreServiceType, + Extra: map[string]any{ + "name": CKeystoneCephServiceName, + }, + }).Extract() + return err + }); err != nil { return fmt.Errorf("unable to create ceph service: %w", err) } - _, err = endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ - Name: CKeystoneCephEndpointName, - Availability: gophercloud.AvailabilityInternal, - URL: fmt.Sprintf(CKeystoneCephEndpointURLTemplate, containerHost, apiForwardedPort.Int()), - ServiceID: cephService.ID, - }).Extract() - if err != nil { + if err := retryOnTransient(ctx, 3, time.Second, func() error { + _, err := endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ + Name: CKeystoneCephEndpointName, + Availability: gophercloud.AvailabilityInternal, + URL: fmt.Sprintf(CKeystoneCephEndpointURLTemplate, containerHost, apiForwardedPort.Int()), + ServiceID: cephService.ID, + }).Extract() + return err + }); err != nil { return fmt.Errorf("unable to create internal ceph endpoint: %w", err) } - _, err = endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ - Name: CKeystoneCephEndpointName, - Availability: gophercloud.AvailabilityPublic, - URL: fmt.Sprintf(CKeystoneCephEndpointURLTemplate, containerHost, apiForwardedPort.Int()), - ServiceID: cephService.ID, - }).Extract() - if err != nil { + if err := retryOnTransient(ctx, 3, time.Second, func() error { + _, err := endpoints.Create(ctx, identityClient, endpoints.CreateOpts{ + Name: CKeystoneCephEndpointName, + Availability: gophercloud.AvailabilityPublic, + URL: fmt.Sprintf(CKeystoneCephEndpointURLTemplate, containerHost, apiForwardedPort.Int()), + ServiceID: cephService.ID, + }).Extract() + return err + }); err != nil { return fmt.Errorf("unable to create public ceph endpoint: %w", err) } @@ -1391,3 +1421,46 @@ func stopContainer(ctx context.Context, container testcontainers.Container) erro } return nil } + +// closeIdleConns closes idle HTTP connections in the default transport. +// This prevents EOF errors caused by reusing connections that were closed +// server-side (e.g., by uwsgi) during long container startup delays. +func closeIdleConns() { + if t, ok := http.DefaultTransport.(*http.Transport); ok { + t.CloseIdleConnections() + } +} + +func retryOnTransient(ctx context.Context, maxAttempts int, delay time.Duration, fn func() error) error { + var lastErr error + for i := 0; i < maxAttempts; i++ { + lastErr = fn() + if lastErr == nil { + return nil + } + if !isTransientErr(lastErr) { + return lastErr + } + if i < maxAttempts-1 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + } + return lastErr +} + +func isTransientErr(err error) bool { + if err == nil { + return false + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + s := err.Error() + return strings.Contains(s, "EOF") || + strings.Contains(s, "connection reset") || + strings.Contains(s, "connection refused") +} diff --git a/test/init_test.go b/test/init_test.go index 2e4e267d..7a313071 100644 --- a/test/init_test.go +++ b/test/init_test.go @@ -17,8 +17,6 @@ package test import ( - "time" - "github.com/clyso/chorus/service/proxy" "github.com/clyso/chorus/service/worker" ) @@ -34,7 +32,6 @@ func init() { if err != nil { panic(err) } - workerConf.Worker.QueueUpdateInterval = 500 * time.Millisecond workerConf.Features.ACL = false workerConf.Features.Tagging = false workerConf.Log.Level = "warn" diff --git a/test/migration/init_test.go b/test/migration/init_test.go index 475dc03f..544788e9 100644 --- a/test/migration/init_test.go +++ b/test/migration/init_test.go @@ -17,8 +17,6 @@ package migration import ( - "time" - "github.com/clyso/chorus/service/proxy" "github.com/clyso/chorus/service/worker" ) @@ -37,9 +35,6 @@ func init() { workerConf.Features.ACL = false workerConf.Features.Tagging = false workerConf.Log.Level = "warn" - workerConf.Worker.SwitchRetryInterval = time.Millisecond * 500 - workerConf.Worker.PauseRetryInterval = time.Millisecond * 500 - workerConf.Worker.QueueUpdateInterval = 500 * time.Millisecond proxyConf, err = proxy.GetConfig() if err != nil { diff --git a/test/migration/restart_repl_test.go b/test/migration/restart_repl_test.go index 58bd8f19..a0b84710 100644 --- a/test/migration/restart_repl_test.go +++ b/test/migration/restart_repl_test.go @@ -189,7 +189,7 @@ func Test_Restart_User_Replication(t *testing.T) { return false } return reps.Replications[0].InitObjDone > 0 - }, e.WaitShort, e.RetryShort) + }, e.WaitLong, e.RetryShort) // add live event for _, bucket := range buckets { @@ -208,7 +208,7 @@ func Test_Restart_User_Replication(t *testing.T) { return false } return reps.Replications[0].IsInitDone && reps.Replications[0].Events > 0 && reps.Replications[0].Events == reps.Replications[0].EventsDone - }, e.WaitShort, e.RetryShort) + }, e.WaitLong, e.RetryShort) for _, bucket := range buckets { buckID := proto.Clone(id).(*pb.ReplicationID) @@ -263,7 +263,7 @@ func Test_Restart_User_Replication(t *testing.T) { } stat := reps.Replications[0] return stat.IsInitDone && stat.Events == stat.EventsDone - }, e.WaitShort, e.RetryShort) + }, e.WaitLong, e.RetryShort) // check that sync was correct for _, bucket := range buckets { diff --git a/test/swift/e2e_test.go b/test/swift/e2e_test.go index defab7a0..b006efd7 100644 --- a/test/swift/e2e_test.go +++ b/test/swift/e2e_test.go @@ -4,7 +4,6 @@ import ( "net/http" "strings" "testing" - "time" "github.com/clyso/chorus/pkg/swift" pb "github.com/clyso/chorus/proto/gen/go/chorus" @@ -42,9 +41,6 @@ func Test_e2e(t *testing.T) { workerConf, err := worker.GetConfig() r.NoError(err, "failed to get worker config") - workerConf.Worker.QueueUpdateInterval = 500 * time.Millisecond - workerConf.Worker.SwitchRetryInterval = time.Millisecond * 500 - workerConf.Worker.PauseRetryInterval = time.Millisecond * 500 workerConf.Log.Level = "warn" workerConf.Storage = swiftConf diff --git a/test/versioned/suite_test.go b/test/versioned/suite_test.go index ac61fe1e..afed3f2d 100644 --- a/test/versioned/suite_test.go +++ b/test/versioned/suite_test.go @@ -51,7 +51,6 @@ import ( pb "github.com/clyso/chorus/proto/gen/go/chorus" "github.com/clyso/chorus/service/worker" "github.com/clyso/chorus/service/worker/copy" - "github.com/clyso/chorus/service/worker/handler" "github.com/clyso/chorus/test/app" "github.com/clyso/chorus/test/env" "github.com/clyso/chorus/test/gen" @@ -259,10 +258,7 @@ var _ = Describe("Minio versioned migration", func() { Lock: &worker.Lock{ Overlap: time.Second, }, - Worker: &handler.Config{ - SwitchRetryInterval: time.Millisecond * 500, - PauseRetryInterval: time.Millisecond * 500, - }, + Worker: &app.WorkerRetryConf, Storage: workerStorages, Api: &api.Config{ Enabled: true, @@ -565,10 +561,7 @@ var _ = Describe("Ceph keystone versioned migration", func() { Lock: &worker.Lock{ Overlap: time.Second, }, - Worker: &handler.Config{ - SwitchRetryInterval: time.Millisecond * 500, - PauseRetryInterval: time.Millisecond * 500, - }, + Worker: &app.WorkerRetryConf, Storage: workerStorages, Api: &api.Config{ Enabled: true, @@ -795,10 +788,7 @@ var _ = Describe("Ceph system user versioned migration", func() { Lock: &worker.Lock{ Overlap: time.Second, }, - Worker: &handler.Config{ - SwitchRetryInterval: time.Millisecond * 500, - PauseRetryInterval: time.Millisecond * 500, - }, + Worker: &app.WorkerRetryConf, Storage: objstore.Config{ Main: CCephSrcInstance, Storages: map[string]objstore.Storage{