Bound CSI recover retries and deduplicate events on persistent mount failures#5662
Bound CSI recover retries and deduplicate events on persistent mount failures#5662mrhapile wants to merge 2 commits intofluid-cloudnative:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @mrhapile. Thanks for your PR. I'm waiting for a fluid-cloudnative member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Summary of ChangesHello @mrhapile, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the stability and observability of the CSI recovery loop by addressing issues of unbounded retries and excessive event generation during persistent mount failures. By introducing a per-mount state tracker, the system now applies exponential backoff to failed recovery attempts and intelligently deduplicates Kubernetes events, only emitting them when a mount's recovery state genuinely changes. This reduces unnecessary load on the API server and etcd, provides clearer operational signals, and ensures more efficient resource utilization without altering behavior for successful recoveries. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
There was a problem hiding this comment.
Code Review
This pull request introduces a RecoverStateTracker to improve the handling of persistent CSI mount failures. By adding exponential backoff for recovery retries and deduplicating Kubernetes events, it effectively addresses the issue of excessive retries and event spam, which can overload the API server. The new logic is well-encapsulated and comes with comprehensive unit tests. The changes are a significant improvement for the stability and observability of the CSI recovery process. I have one minor suggestion to improve an assertion in a new test file.
| if err.Error() != "Failed to create: induced engine creation failure" && err.Error() != "induced engine creation failure" { | ||
| t.Logf("Got expected error: %v", err) | ||
| } |
There was a problem hiding this comment.
The error assertion logic here is confusing. The if condition checks if the error is not one of the expected strings, and then t.Logf just logs a message without failing the test. This should be a proper assertion that fails the test if the error is not what's expected.
expectedErr := "Failed to create: induced engine creation failure"
if err.Error() != expectedErr {
t.Errorf("Expected error '%s', but got '%s'", expectedErr, err.Error())
}There was a problem hiding this comment.
Pull request overview
This PR addresses issue #5661 by introducing bounded retry behavior and event deduplication for the CSI recovery loop to prevent API server overload during persistent mount failures.
Changes:
- Adds
RecoverStateTrackerto manage per-mount recovery state with exponential backoff (capped at 5 minutes) - Implements event deduplication to emit Kubernetes events only on state transitions
- Guards mount-dependent tests to run only on Linux platforms
- Includes comprehensive unit tests for the new state tracking functionality
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/csi/recover/state.go | New state tracker implementing exponential backoff and event deduplication logic |
| pkg/csi/recover/state_test.go | Comprehensive unit tests for state tracker functionality |
| pkg/csi/recover/recover.go | Integrates state tracker into recovery loop with backoff checks and conditional event emission |
| pkg/csi/recover/recover_test.go | Updates existing tests to initialize state tracker and adds platform guards |
| pkg/controllers/runtime_controller_test.go | New tests for runtime controller (unrelated to CSI recovery) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err := r.recoverBrokenMount(point); err != nil { | ||
| r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed) | ||
| // Record failure to increase backoff for next attempt | ||
| r.stateTracker.RecordFailure(point.MountPath) | ||
| // Only emit failure event on state change (first failure or reason change) | ||
| // This prevents flooding the event stream with identical failure events | ||
| if r.stateTracker.ShouldEmitEvent(point.MountPath, common.FuseRecoverFailed) { | ||
| r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed) | ||
| } | ||
| return | ||
| } |
There was a problem hiding this comment.
This error handling relies on recoverBrokenMount returning an error when the mount operation fails. However, there's a pre-existing bug in recoverBrokenMount (lines 174-186) where it shadows the named return value err with := assignment, causing it to always return nil even when Mount fails.
This means the backoff and event emission logic here will never be triggered for actual mount failures, undermining the primary purpose of this PR. The bug in recoverBrokenMount should be fixed by changing line 182 from if err := r.Mount(...) to if err = r.Mount(...) or by explicitly returning the error.
| /* | ||
| Copyright 2023 The Fluid Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package controllers | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "testing" | ||
|
|
||
| datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" | ||
| "github.com/fluid-cloudnative/fluid/pkg/common" | ||
| "github.com/fluid-cloudnative/fluid/pkg/dataoperation" | ||
| "github.com/fluid-cloudnative/fluid/pkg/ddc/base" | ||
| cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" | ||
| "github.com/go-logr/logr" | ||
| corev1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/runtime" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/client-go/kubernetes/scheme" | ||
| "k8s.io/client-go/tools/record" | ||
| ctrl "sigs.k8s.io/controller-runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
| ) | ||
|
|
||
| // -- Mocks -- | ||
|
|
||
| type mockEngine struct { | ||
| id string | ||
| } | ||
|
|
||
| func (m *mockEngine) ID() string { | ||
| return m.id | ||
| } | ||
| func (m *mockEngine) Shutdown() error { | ||
| return nil | ||
| } | ||
| func (m *mockEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, err error) { | ||
| return true, nil | ||
| } | ||
| func (m *mockEngine) CreateVolume() (err error) { | ||
| return nil | ||
| } | ||
| func (m *mockEngine) DeleteVolume() (err error) { | ||
| return nil | ||
| } | ||
| func (m *mockEngine) Sync(ctx cruntime.ReconcileRequestContext) error { | ||
| return nil | ||
| } | ||
| func (m *mockEngine) Validate(ctx cruntime.ReconcileRequestContext) (err error) { | ||
| return nil | ||
| } | ||
| func (m *mockEngine) Operate(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus, operation dataoperation.OperationInterface) (ctrl.Result, error) { | ||
| return ctrl.Result{}, nil | ||
| } | ||
|
|
||
| type mockRuntimeReconciler struct { | ||
| *RuntimeReconciler | ||
| failEngineCreation bool | ||
| } | ||
|
|
||
| func (m *mockRuntimeReconciler) GetOrCreateEngine(ctx cruntime.ReconcileRequestContext) (base.Engine, error) { | ||
| if m.failEngineCreation { | ||
| return nil, fmt.Errorf("induced engine creation failure") | ||
| } | ||
| return &mockEngine{id: "test-engine"}, nil | ||
| } | ||
|
|
||
| func (m *mockRuntimeReconciler) RemoveEngine(ctx cruntime.ReconcileRequestContext) { | ||
| // no-op | ||
| } | ||
|
|
||
| // -- Helpers -- | ||
|
|
||
| func newTestReconciler(t *testing.T, objects ...client.Object) (*mockRuntimeReconciler, client.Client) { | ||
| s := runtime.NewScheme() | ||
| _ = scheme.AddToScheme(s) | ||
| _ = datav1alpha1.AddToScheme(s) | ||
| _ = corev1.AddToScheme(s) | ||
|
|
||
| fakeClient := fake.NewClientBuilder(). | ||
| WithScheme(s). | ||
| WithStatusSubresource(objects...). | ||
| WithObjects(objects...). | ||
| Build() | ||
|
|
||
| // Use discard logger | ||
| log := logr.Discard() | ||
| recorder := record.NewFakeRecorder(10) | ||
|
|
||
| mock := &mockRuntimeReconciler{} | ||
| // Hook up the RuntimeReconciler to use 'mock' as the implementation | ||
| baseReconciler := NewRuntimeReconciler(mock, fakeClient, log, recorder) | ||
| mock.RuntimeReconciler = baseReconciler | ||
|
|
||
| return mock, fakeClient | ||
| } | ||
|
|
||
| // -- Tests -- | ||
|
|
||
| func TestReconcileInternal_AddOwnerReference(t *testing.T) { | ||
| // Scenario: Runtime exists, Dataset exists, but OwnerReference is missing. | ||
| // Expected: Reconciler should add OwnerReference to Runtime and Requeue. | ||
|
|
||
| dataset := &datav1alpha1.Dataset{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "Dataset", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| } | ||
| runtimeObj := &datav1alpha1.AlluxioRuntime{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "AlluxioRuntime", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| // No OwnerReferences | ||
| }, | ||
| } | ||
|
|
||
| reconciler, c := newTestReconciler(t, dataset, runtimeObj) | ||
|
|
||
| ctx := cruntime.ReconcileRequestContext{ | ||
| Context: context.TODO(), | ||
| Log: logr.Discard(), | ||
| NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, | ||
| RuntimeType: common.AlluxioRuntime, | ||
| Runtime: runtimeObj, | ||
| Category: common.AccelerateCategory, | ||
| Client: c, | ||
| } | ||
|
|
||
| // First pass | ||
| result, err := reconciler.ReconcileInternal(ctx) | ||
| if err != nil { | ||
| t.Fatalf("ReconcileInternal failed: %v", err) | ||
| } | ||
|
|
||
| // Check if Requeue is true | ||
| if !result.Requeue { | ||
| t.Errorf("Expected Requeue to be true for OwnerReference update, got %v", result) | ||
| } | ||
|
|
||
| // Verify OwnerReference | ||
| updatedRuntime := &datav1alpha1.AlluxioRuntime{} | ||
| err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) | ||
| if err != nil { | ||
| t.Fatalf("Failed to get updated runtime: %v", err) | ||
| } | ||
|
|
||
| if len(updatedRuntime.OwnerReferences) != 1 { | ||
| t.Errorf("Expected 1 OwnerReference, got %d", len(updatedRuntime.OwnerReferences)) | ||
| } else { | ||
| ref := updatedRuntime.OwnerReferences[0] | ||
| if ref.UID != dataset.UID { | ||
| t.Errorf("Expected OwnerReference UID %s, got %s", dataset.UID, ref.UID) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func TestReconcileInternal_AddFinalizer(t *testing.T) { | ||
| // Scenario: Runtime has OwnerReference but no Finalizer. | ||
| // Expected: Reconciler should add Finalizer and Requeue. | ||
|
|
||
| dataset := &datav1alpha1.Dataset{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "Dataset", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| } | ||
| runtimeObj := &datav1alpha1.AlluxioRuntime{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "AlluxioRuntime", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| OwnerReferences: []metav1.OwnerReference{ | ||
| { | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| Kind: "Dataset", | ||
| Name: "test-dataset", | ||
| UID: "dataset-uid-123", | ||
| Controller: func() *bool { b := true; return &b }(), | ||
| }, | ||
| }, | ||
| // No Finalizer | ||
| }, | ||
| } | ||
|
|
||
| reconciler, c := newTestReconciler(t, dataset, runtimeObj) | ||
|
|
||
| ctx := cruntime.ReconcileRequestContext{ | ||
| Context: context.TODO(), | ||
| Log: logr.Discard(), | ||
| NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, | ||
| RuntimeType: common.AlluxioRuntime, | ||
| Runtime: runtimeObj, | ||
| Category: common.AccelerateCategory, | ||
| FinalizerName: "fluid-alluxio-controller-finalizer", | ||
| Client: c, | ||
| } | ||
|
|
||
| // First pass | ||
| result, err := reconciler.ReconcileInternal(ctx) | ||
| if err != nil { | ||
| t.Fatalf("ReconcileInternal failed: %v", err) | ||
| } | ||
|
|
||
| // Check result | ||
| if !result.Requeue { | ||
| t.Errorf("Expected Requeue to be true for Finalizer update, got %v", result) | ||
| } | ||
|
|
||
| // Verify Finalizer | ||
| updatedRuntime := &datav1alpha1.AlluxioRuntime{} | ||
| err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) | ||
| if err != nil { | ||
| t.Fatalf("Failed to get updated runtime: %v", err) | ||
| } | ||
|
|
||
| if len(updatedRuntime.Finalizers) == 0 { | ||
| t.Errorf("Expected Finalizer detection, got none") | ||
| } else { | ||
| found := false | ||
| for _, f := range updatedRuntime.Finalizers { | ||
| if f == "fluid-alluxio-controller-finalizer" { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("Finalizer 'fluid-alluxio-controller-finalizer' not found in %v", updatedRuntime.Finalizers) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func TestReconcileInternal_ReconcileRuntime(t *testing.T) { | ||
| // Scenario: fully set up Runtime (owners, finalizers correct). | ||
| // Expected: Should proceed to ReconcileRuntime logic (Setup, Sync). | ||
| // Since MockEngine returns success, it should return success (Check utils.NoRequeue semantics). | ||
|
|
||
| dataset := &datav1alpha1.Dataset{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "Dataset", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| Status: datav1alpha1.DatasetStatus{ | ||
| Phase: datav1alpha1.BoundDatasetPhase, | ||
| }, | ||
| } | ||
| runtimeObj := &datav1alpha1.AlluxioRuntime{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "AlluxioRuntime", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| OwnerReferences: []metav1.OwnerReference{ | ||
| { | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| Kind: "Dataset", | ||
| Name: "test-dataset", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| }, | ||
| Finalizers: []string{"fluid-alluxio-controller-finalizer"}, | ||
| }, | ||
| } | ||
|
|
||
| reconciler, c := newTestReconciler(t, dataset, runtimeObj) | ||
|
|
||
| ctx := cruntime.ReconcileRequestContext{ | ||
| Context: context.TODO(), | ||
| Log: logr.Discard(), | ||
| NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, | ||
| RuntimeType: common.AlluxioRuntime, | ||
| Runtime: runtimeObj, | ||
| Category: common.AccelerateCategory, | ||
| FinalizerName: "fluid-alluxio-controller-finalizer", | ||
| Dataset: dataset, | ||
| Client: c, | ||
| } | ||
|
|
||
| // Reconcile | ||
| result, err := reconciler.ReconcileInternal(ctx) | ||
| if err != nil { | ||
| t.Fatalf("ReconcileInternal failed: %v", err) | ||
| } | ||
|
|
||
| if result.Requeue && result.RequeueAfter == 0 { | ||
| t.Errorf("Did not expect immediate Requeue for successful reconcile") | ||
| } | ||
| } | ||
|
|
||
| func TestReconcileInternal_EngineError(t *testing.T) { | ||
| // Scenario: GetOrCreateEngine fails. | ||
| // Expected: ReconcileInternal returns error. | ||
|
|
||
| dataset := &datav1alpha1.Dataset{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "Dataset", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| } | ||
| runtimeObj := &datav1alpha1.AlluxioRuntime{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "AlluxioRuntime", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| }, | ||
| } | ||
|
|
||
| reconciler, c := newTestReconciler(t, dataset, runtimeObj) | ||
| reconciler.failEngineCreation = true | ||
|
|
||
| ctx := cruntime.ReconcileRequestContext{ | ||
| Context: context.TODO(), | ||
| Log: logr.Discard(), | ||
| NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, | ||
| RuntimeType: common.AlluxioRuntime, | ||
| Runtime: runtimeObj, | ||
| Category: common.AccelerateCategory, | ||
| Client: c, | ||
| } | ||
|
|
||
| // Reconcile | ||
| _, err := reconciler.ReconcileInternal(ctx) | ||
| if err == nil { | ||
| t.Fatalf("Expected error from ReconcileInternal due to engine failure, got nil") | ||
| } | ||
| if err.Error() != "Failed to create: induced engine creation failure" && err.Error() != "induced engine creation failure" { | ||
| t.Logf("Got expected error: %v", err) | ||
| } | ||
| } | ||
|
|
||
| func TestReconcileRuntimeDeletion(t *testing.T) { | ||
| // Scenario: Runtime has DeletionTimestamp. | ||
| // Expected: Clean up (DeleteVolume, Shutdown), Remove Finalizer. | ||
|
|
||
| now := metav1.Now() | ||
| dataset := &datav1alpha1.Dataset{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "Dataset", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| } | ||
| runtimeObj := &datav1alpha1.AlluxioRuntime{ | ||
| TypeMeta: metav1.TypeMeta{ | ||
| Kind: "AlluxioRuntime", | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| }, | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "test-dataset", | ||
| Namespace: "default", | ||
| DeletionTimestamp: &now, | ||
| Finalizers: []string{"fluid-alluxio-controller-finalizer"}, | ||
| OwnerReferences: []metav1.OwnerReference{ | ||
| { | ||
| APIVersion: "data.fluid.io/v1alpha1", | ||
| Kind: "Dataset", | ||
| Name: "test-dataset", | ||
| UID: "dataset-uid-123", | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| reconciler, c := newTestReconciler(t, dataset, runtimeObj) | ||
|
|
||
| ctx := cruntime.ReconcileRequestContext{ | ||
| Context: context.TODO(), | ||
| Log: logr.Discard(), | ||
| NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, | ||
| RuntimeType: common.AlluxioRuntime, | ||
| Runtime: runtimeObj, | ||
| Category: common.AccelerateCategory, | ||
| FinalizerName: "fluid-alluxio-controller-finalizer", | ||
| Client: c, | ||
| } | ||
|
|
||
| // Reconcile | ||
| result, err := reconciler.ReconcileInternal(ctx) | ||
| if err != nil { | ||
| t.Fatalf("ReconcileInternal failed: %v", err) | ||
| } | ||
|
|
||
| // Should not requeue if deletion succeeds (Remove Finalizer calls Update, which triggers new event, so return NoRequeue) | ||
| if result.Requeue { | ||
| t.Errorf("Expected no requeue after successful deletion, got %v", result) | ||
| } | ||
|
|
||
| // Verify Finalizer is removed | ||
| updatedRuntime := &datav1alpha1.AlluxioRuntime{} | ||
| err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) | ||
| if errors.IsNotFound(err) { | ||
| // Object deleted, success! | ||
| return | ||
| } | ||
| if err != nil { | ||
| t.Fatalf("Failed to get updated runtime: %v", err) | ||
| } | ||
|
|
||
| if len(updatedRuntime.Finalizers) != 0 { | ||
| t.Errorf("Expected finalizers to be empty, got %v", updatedRuntime.Finalizers) | ||
| } | ||
| } |
There was a problem hiding this comment.
This file appears to be unrelated to the CSI recovery retry and event deduplication changes described in the PR. It adds tests for the runtime controller which has no connection to the CSI recover functionality being modified in this PR.
Consider moving these tests to a separate PR focused on improving test coverage for the runtime controller, as mixing unrelated changes makes the PR harder to review and could complicate rollbacks if issues arise with either set of changes.
| // GetOrCreateState retrieves or initializes state for a mount point. | ||
| // Uses fine-grained locking to avoid blocking other mount operations. | ||
| func (t *RecoverStateTracker) GetOrCreateState(mountPath string) *MountState { | ||
| t.mu.Lock() | ||
| defer t.mu.Unlock() | ||
|
|
||
| if state, exists := t.states[mountPath]; exists { | ||
| return state | ||
| } | ||
|
|
||
| state := &MountState{ | ||
| CurrentBackoff: initialBackoff, | ||
| IsHealthy: true, // Assume healthy until proven otherwise | ||
| } | ||
| t.states[mountPath] = state | ||
| return state | ||
| } |
There was a problem hiding this comment.
The GetOrCreateState method returns a pointer to the internal MountState, which is then accessed outside the lock. This creates a data race because the returned state can be modified by other goroutines (e.g., via RecordFailure, RecordSuccess, ShouldEmitEvent) while this code reads the IsHealthy field.
To fix this, consider either:
- Add a method like
IsHealthy(mountPath string) boolthat reads the field under lock protection, or - Have
RecordSuccessreturn a boolean indicating whether the state transitioned from unhealthy to healthy.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5662 +/- ##
==========================================
+ Coverage 59.32% 59.43% +0.10%
==========================================
Files 444 445 +1
Lines 30540 30664 +124
==========================================
+ Hits 18119 18225 +106
- Misses 10917 10932 +15
- Partials 1504 1507 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|



Fixes #5661
This PR improves the stability and observability of the CSI recovery loop by
introducing bounded retry behavior and suppressing duplicate recovery events
under persistent mount failure scenarios.
Problem
The CSI recover loop currently retries failed mount recoveries at a fixed
interval and emits Kubernetes events on every iteration. When a mount cannot
be recovered for an extended period, this can result in:
What this PR does
RecoverStateTrackerto track recovery stateDesign highlights
Tests
RecoverStateTrackercovering:as
k8s.io/utils/mountis not supported on non-Linux platforms (e.g. macOS)Verification
go test ./pkg/csi/recover/... -v