Skip to content

Bound CSI recover retries and deduplicate events on persistent mount failures#5662

Open
mrhapile wants to merge 2 commits intofluid-cloudnative:masterfrom
mrhapile:csi-fix
Open

Bound CSI recover retries and deduplicate events on persistent mount failures#5662
mrhapile wants to merge 2 commits intofluid-cloudnative:masterfrom
mrhapile:csi-fix

Conversation

@mrhapile
Copy link

@mrhapile mrhapile commented Feb 3, 2026

Fixes #5661

This PR improves the stability and observability of the CSI recovery loop by
introducing bounded retry behavior and suppressing duplicate recovery events
under persistent mount failure scenarios.

Problem

The CSI recover loop currently retries failed mount recoveries at a fixed
interval and emits Kubernetes events on every iteration. When a mount cannot
be recovered for an extended period, this can result in:

  • Unbounded retry attempts
  • Excessive, duplicate Kubernetes events
  • Unnecessary API server and etcd load

What this PR does

  • Introduces a per-mount RecoverStateTracker to track recovery state
  • Applies exponential backoff on consecutive recovery failures (capped)
  • Deduplicates recovery events, emitting only on state transitions
  • Resets backoff and failure state immediately on successful recovery

Design highlights

  • Per-mount state tracking (no global coupling)
  • Thread-safe and suitable for long-running CSI daemonsets
  • No public API changes
  • No behavior change on successful recoveries

Tests

  • Added unit tests for RecoverStateTracker covering:
    • Backoff progression and capping
    • Event deduplication behavior
    • State reset on success
  • Existing mount-dependent recovery tests are guarded to run only on Linux,
    as k8s.io/utils/mount is not supported on non-Linux platforms (e.g. macOS)

Verification

go test ./pkg/csi/recover/... -v

Signed-off-by: mrhapile <allinonegaming3456@gmail.com>
Signed-off-by: mrhapile <allinonegaming3456@gmail.com>
Copilot AI review requested due to automatic review settings February 3, 2026 14:09
@fluid-e2e-bot
Copy link

fluid-e2e-bot bot commented Feb 3, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign yangyuliufeng for approval by writing /assign @yangyuliufeng in a comment. For more information see:The Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@fluid-e2e-bot
Copy link

fluid-e2e-bot bot commented Feb 3, 2026

Hi @mrhapile. Thanks for your PR.

I'm waiting for a fluid-cloudnative member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @mrhapile, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the stability and observability of the CSI recovery loop by addressing issues of unbounded retries and excessive event generation during persistent mount failures. By introducing a per-mount state tracker, the system now applies exponential backoff to failed recovery attempts and intelligently deduplicates Kubernetes events, only emitting them when a mount's recovery state genuinely changes. This reduces unnecessary load on the API server and etcd, provides clearer operational signals, and ensures more efficient resource utilization without altering behavior for successful recoveries.

Highlights

  • Bounded CSI Recovery Retries: Implemented an exponential backoff mechanism for CSI mount recovery attempts, capping retries to prevent unbounded loops and reduce resource consumption during persistent failures.
  • Deduplicated Kubernetes Events: Introduced logic to deduplicate Kubernetes events related to CSI mount recovery, ensuring events are only emitted on state transitions (e.g., from healthy to failed, or failed to recovered) rather than on every retry attempt. This reduces API server and etcd load.
  • Per-Mount State Tracking: A new RecoverStateTracker is used to manage the recovery state for each individual mount point, allowing for independent backoff and event deduplication without global coupling.
  • State Reset on Success: The recovery state, including consecutive failure counts and backoff, is immediately reset upon successful mount recovery, ensuring quick re-attempts if a new failure occurs.
  • New Unit Tests: Comprehensive unit tests have been added for the RecoverStateTracker to validate its backoff progression, capping, event deduplication, and state reset behaviors. Existing mount-dependent recovery tests are now guarded to run only on Linux.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • pkg/controllers/runtime_controller_test.go
    • Added new unit tests for the RuntimeReconciler to cover scenarios such as adding owner references, adding finalizers, handling engine creation errors, and managing runtime deletion. This file appears to be a general addition to the controller test suite and not directly related to the CSI recovery logic described in the PR title.
  • pkg/csi/recover/recover.go
    • Integrated the new RecoverStateTracker to manage recovery state for each mount point.
    • Modified the doRecover function to check ShouldAttemptRecovery before proceeding, implementing the exponential backoff.
    • Updated event emission logic to use ShouldEmitEvent for deduplication, preventing repeated events for the same failure reason.
    • Added calls to RecordFailure and RecordSuccess to update the mount's recovery state based on recovery outcomes.
    • Added RemoveState call when a mount point is cleaned up.
  • pkg/csi/recover/recover_test.go
    • Imported the runtime package.
    • Updated existing FuseRecover test setups to initialize the new stateTracker.
    • Added Skip directives to mount-dependent tests to ensure they only run on Linux, aligning with the k8s.io/utils/mount utility's platform support.
  • pkg/csi/recover/state.go
    • Introduced MountState struct to track individual mount recovery status, including LastFailureTime, ConsecutiveFailures, LastEventReason, CurrentBackoff, and IsHealthy.
    • Defined RecoverStateTracker with a thread-safe map to manage MountState for multiple mount paths.
    • Implemented NewRecoverStateTracker for initialization.
    • Added GetOrCreateState to retrieve or initialize state for a given mount path.
    • Implemented RemoveState to clean up state for unmounted paths.
    • Provided ShouldAttemptRecovery to determine if a recovery attempt should be made, respecting exponential backoff.
    • Developed RecordFailure to update state on failure, incrementing failures and exponentially increasing backoff up to a cap.
    • Created RecordSuccess to reset failure counts and backoff on successful recovery.
    • Implemented ShouldEmitEvent to deduplicate Kubernetes events by only allowing emission on state transitions.
    • Added GetBackoffInfo for observability of current backoff state.
    • Defined constants for initialBackoff, maxBackoff, and backoffMultiplier.
  • pkg/csi/recover/state_test.go
    • Added a new unit test file for RecoverStateTracker.
    • Included tests for NewRecoverStateTracker, GetOrCreateState, RemoveState, ShouldAttemptRecovery, RecordFailure, RecordSuccess, and ShouldEmitEvent.
    • Verified exponential backoff progression and capping.
    • Confirmed event deduplication behavior and state resets.
Activity
  • The pull request introduces new unit tests for the RecoverStateTracker and guards existing mount-dependent recovery tests to run only on Linux, indicating thorough testing of the new functionality and platform compatibility considerations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 3, 2026

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a RecoverStateTracker to improve the handling of persistent CSI mount failures. By adding exponential backoff for recovery retries and deduplicating Kubernetes events, it effectively addresses the issue of excessive retries and event spam, which can overload the API server. The new logic is well-encapsulated and comes with comprehensive unit tests. The changes are a significant improvement for the stability and observability of the CSI recovery process. I have one minor suggestion to improve an assertion in a new test file.

Comment on lines +375 to +377
if err.Error() != "Failed to create: induced engine creation failure" && err.Error() != "induced engine creation failure" {
t.Logf("Got expected error: %v", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error assertion logic here is confusing. The if condition checks if the error is not one of the expected strings, and then t.Logf just logs a message without failing the test. This should be a proper assertion that fails the test if the error is not what's expected.

expectedErr := "Failed to create: induced engine creation failure"
if err.Error() != expectedErr {
	t.Errorf("Expected error '%s', but got '%s'", expectedErr, err.Error())
}

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses issue #5661 by introducing bounded retry behavior and event deduplication for the CSI recovery loop to prevent API server overload during persistent mount failures.

Changes:

  • Adds RecoverStateTracker to manage per-mount recovery state with exponential backoff (capped at 5 minutes)
  • Implements event deduplication to emit Kubernetes events only on state transitions
  • Guards mount-dependent tests to run only on Linux platforms
  • Includes comprehensive unit tests for the new state tracking functionality

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pkg/csi/recover/state.go New state tracker implementing exponential backoff and event deduplication logic
pkg/csi/recover/state_test.go Comprehensive unit tests for state tracker functionality
pkg/csi/recover/recover.go Integrates state tracker into recovery loop with backoff checks and conditional event emission
pkg/csi/recover/recover_test.go Updates existing tests to initialize state tracker and adds platform guards
pkg/controllers/runtime_controller_test.go New tests for runtime controller (unrelated to CSI recovery)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 291 to 300
if err := r.recoverBrokenMount(point); err != nil {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
// Record failure to increase backoff for next attempt
r.stateTracker.RecordFailure(point.MountPath)
// Only emit failure event on state change (first failure or reason change)
// This prevents flooding the event stream with identical failure events
if r.stateTracker.ShouldEmitEvent(point.MountPath, common.FuseRecoverFailed) {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
}
return
}
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error handling relies on recoverBrokenMount returning an error when the mount operation fails. However, there's a pre-existing bug in recoverBrokenMount (lines 174-186) where it shadows the named return value err with := assignment, causing it to always return nil even when Mount fails.

This means the backoff and event emission logic here will never be triggered for actual mount failures, undermining the primary purpose of this PR. The bug in recoverBrokenMount should be fixed by changing line 182 from if err := r.Mount(...) to if err = r.Mount(...) or by explicitly returning the error.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +455
/*
Copyright 2023 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"
"testing"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/dataoperation"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// -- Mocks --

type mockEngine struct {
id string
}

func (m *mockEngine) ID() string {
return m.id
}
func (m *mockEngine) Shutdown() error {
return nil
}
func (m *mockEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, err error) {
return true, nil
}
func (m *mockEngine) CreateVolume() (err error) {
return nil
}
func (m *mockEngine) DeleteVolume() (err error) {
return nil
}
func (m *mockEngine) Sync(ctx cruntime.ReconcileRequestContext) error {
return nil
}
func (m *mockEngine) Validate(ctx cruntime.ReconcileRequestContext) (err error) {
return nil
}
func (m *mockEngine) Operate(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus, operation dataoperation.OperationInterface) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

type mockRuntimeReconciler struct {
*RuntimeReconciler
failEngineCreation bool
}

func (m *mockRuntimeReconciler) GetOrCreateEngine(ctx cruntime.ReconcileRequestContext) (base.Engine, error) {
if m.failEngineCreation {
return nil, fmt.Errorf("induced engine creation failure")
}
return &mockEngine{id: "test-engine"}, nil
}

func (m *mockRuntimeReconciler) RemoveEngine(ctx cruntime.ReconcileRequestContext) {
// no-op
}

// -- Helpers --

func newTestReconciler(t *testing.T, objects ...client.Object) (*mockRuntimeReconciler, client.Client) {
s := runtime.NewScheme()
_ = scheme.AddToScheme(s)
_ = datav1alpha1.AddToScheme(s)
_ = corev1.AddToScheme(s)

fakeClient := fake.NewClientBuilder().
WithScheme(s).
WithStatusSubresource(objects...).
WithObjects(objects...).
Build()

// Use discard logger
log := logr.Discard()
recorder := record.NewFakeRecorder(10)

mock := &mockRuntimeReconciler{}
// Hook up the RuntimeReconciler to use 'mock' as the implementation
baseReconciler := NewRuntimeReconciler(mock, fakeClient, log, recorder)
mock.RuntimeReconciler = baseReconciler

return mock, fakeClient
}

// -- Tests --

func TestReconcileInternal_AddOwnerReference(t *testing.T) {
// Scenario: Runtime exists, Dataset exists, but OwnerReference is missing.
// Expected: Reconciler should add OwnerReference to Runtime and Requeue.

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: "Dataset",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
UID: "dataset-uid-123",
},
}
runtimeObj := &datav1alpha1.AlluxioRuntime{
TypeMeta: metav1.TypeMeta{
Kind: "AlluxioRuntime",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
// No OwnerReferences
},
}

reconciler, c := newTestReconciler(t, dataset, runtimeObj)

ctx := cruntime.ReconcileRequestContext{
Context: context.TODO(),
Log: logr.Discard(),
NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"},
RuntimeType: common.AlluxioRuntime,
Runtime: runtimeObj,
Category: common.AccelerateCategory,
Client: c,
}

// First pass
result, err := reconciler.ReconcileInternal(ctx)
if err != nil {
t.Fatalf("ReconcileInternal failed: %v", err)
}

// Check if Requeue is true
if !result.Requeue {
t.Errorf("Expected Requeue to be true for OwnerReference update, got %v", result)
}

// Verify OwnerReference
updatedRuntime := &datav1alpha1.AlluxioRuntime{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime)
if err != nil {
t.Fatalf("Failed to get updated runtime: %v", err)
}

if len(updatedRuntime.OwnerReferences) != 1 {
t.Errorf("Expected 1 OwnerReference, got %d", len(updatedRuntime.OwnerReferences))
} else {
ref := updatedRuntime.OwnerReferences[0]
if ref.UID != dataset.UID {
t.Errorf("Expected OwnerReference UID %s, got %s", dataset.UID, ref.UID)
}
}
}

func TestReconcileInternal_AddFinalizer(t *testing.T) {
// Scenario: Runtime has OwnerReference but no Finalizer.
// Expected: Reconciler should add Finalizer and Requeue.

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: "Dataset",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
UID: "dataset-uid-123",
},
}
runtimeObj := &datav1alpha1.AlluxioRuntime{
TypeMeta: metav1.TypeMeta{
Kind: "AlluxioRuntime",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "data.fluid.io/v1alpha1",
Kind: "Dataset",
Name: "test-dataset",
UID: "dataset-uid-123",
Controller: func() *bool { b := true; return &b }(),
},
},
// No Finalizer
},
}

reconciler, c := newTestReconciler(t, dataset, runtimeObj)

ctx := cruntime.ReconcileRequestContext{
Context: context.TODO(),
Log: logr.Discard(),
NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"},
RuntimeType: common.AlluxioRuntime,
Runtime: runtimeObj,
Category: common.AccelerateCategory,
FinalizerName: "fluid-alluxio-controller-finalizer",
Client: c,
}

// First pass
result, err := reconciler.ReconcileInternal(ctx)
if err != nil {
t.Fatalf("ReconcileInternal failed: %v", err)
}

// Check result
if !result.Requeue {
t.Errorf("Expected Requeue to be true for Finalizer update, got %v", result)
}

// Verify Finalizer
updatedRuntime := &datav1alpha1.AlluxioRuntime{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime)
if err != nil {
t.Fatalf("Failed to get updated runtime: %v", err)
}

if len(updatedRuntime.Finalizers) == 0 {
t.Errorf("Expected Finalizer detection, got none")
} else {
found := false
for _, f := range updatedRuntime.Finalizers {
if f == "fluid-alluxio-controller-finalizer" {
found = true
break
}
}
if !found {
t.Errorf("Finalizer 'fluid-alluxio-controller-finalizer' not found in %v", updatedRuntime.Finalizers)
}
}
}

func TestReconcileInternal_ReconcileRuntime(t *testing.T) {
// Scenario: fully set up Runtime (owners, finalizers correct).
// Expected: Should proceed to ReconcileRuntime logic (Setup, Sync).
// Since MockEngine returns success, it should return success (Check utils.NoRequeue semantics).

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: "Dataset",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
UID: "dataset-uid-123",
},
Status: datav1alpha1.DatasetStatus{
Phase: datav1alpha1.BoundDatasetPhase,
},
}
runtimeObj := &datav1alpha1.AlluxioRuntime{
TypeMeta: metav1.TypeMeta{
Kind: "AlluxioRuntime",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "data.fluid.io/v1alpha1",
Kind: "Dataset",
Name: "test-dataset",
UID: "dataset-uid-123",
},
},
Finalizers: []string{"fluid-alluxio-controller-finalizer"},
},
}

reconciler, c := newTestReconciler(t, dataset, runtimeObj)

ctx := cruntime.ReconcileRequestContext{
Context: context.TODO(),
Log: logr.Discard(),
NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"},
RuntimeType: common.AlluxioRuntime,
Runtime: runtimeObj,
Category: common.AccelerateCategory,
FinalizerName: "fluid-alluxio-controller-finalizer",
Dataset: dataset,
Client: c,
}

// Reconcile
result, err := reconciler.ReconcileInternal(ctx)
if err != nil {
t.Fatalf("ReconcileInternal failed: %v", err)
}

if result.Requeue && result.RequeueAfter == 0 {
t.Errorf("Did not expect immediate Requeue for successful reconcile")
}
}

func TestReconcileInternal_EngineError(t *testing.T) {
// Scenario: GetOrCreateEngine fails.
// Expected: ReconcileInternal returns error.

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: "Dataset",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
UID: "dataset-uid-123",
},
}
runtimeObj := &datav1alpha1.AlluxioRuntime{
TypeMeta: metav1.TypeMeta{
Kind: "AlluxioRuntime",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
},
}

reconciler, c := newTestReconciler(t, dataset, runtimeObj)
reconciler.failEngineCreation = true

ctx := cruntime.ReconcileRequestContext{
Context: context.TODO(),
Log: logr.Discard(),
NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"},
RuntimeType: common.AlluxioRuntime,
Runtime: runtimeObj,
Category: common.AccelerateCategory,
Client: c,
}

// Reconcile
_, err := reconciler.ReconcileInternal(ctx)
if err == nil {
t.Fatalf("Expected error from ReconcileInternal due to engine failure, got nil")
}
if err.Error() != "Failed to create: induced engine creation failure" && err.Error() != "induced engine creation failure" {
t.Logf("Got expected error: %v", err)
}
}

func TestReconcileRuntimeDeletion(t *testing.T) {
// Scenario: Runtime has DeletionTimestamp.
// Expected: Clean up (DeleteVolume, Shutdown), Remove Finalizer.

now := metav1.Now()
dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: "Dataset",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
UID: "dataset-uid-123",
},
}
runtimeObj := &datav1alpha1.AlluxioRuntime{
TypeMeta: metav1.TypeMeta{
Kind: "AlluxioRuntime",
APIVersion: "data.fluid.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "default",
DeletionTimestamp: &now,
Finalizers: []string{"fluid-alluxio-controller-finalizer"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "data.fluid.io/v1alpha1",
Kind: "Dataset",
Name: "test-dataset",
UID: "dataset-uid-123",
},
},
},
}

reconciler, c := newTestReconciler(t, dataset, runtimeObj)

ctx := cruntime.ReconcileRequestContext{
Context: context.TODO(),
Log: logr.Discard(),
NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"},
RuntimeType: common.AlluxioRuntime,
Runtime: runtimeObj,
Category: common.AccelerateCategory,
FinalizerName: "fluid-alluxio-controller-finalizer",
Client: c,
}

// Reconcile
result, err := reconciler.ReconcileInternal(ctx)
if err != nil {
t.Fatalf("ReconcileInternal failed: %v", err)
}

// Should not requeue if deletion succeeds (Remove Finalizer calls Update, which triggers new event, so return NoRequeue)
if result.Requeue {
t.Errorf("Expected no requeue after successful deletion, got %v", result)
}

// Verify Finalizer is removed
updatedRuntime := &datav1alpha1.AlluxioRuntime{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime)
if errors.IsNotFound(err) {
// Object deleted, success!
return
}
if err != nil {
t.Fatalf("Failed to get updated runtime: %v", err)
}

if len(updatedRuntime.Finalizers) != 0 {
t.Errorf("Expected finalizers to be empty, got %v", updatedRuntime.Finalizers)
}
}
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file appears to be unrelated to the CSI recovery retry and event deduplication changes described in the PR. It adds tests for the runtime controller which has no connection to the CSI recover functionality being modified in this PR.

Consider moving these tests to a separate PR focused on improving test coverage for the runtime controller, as mixing unrelated changes makes the PR harder to review and could complicate rollbacks if issues arise with either set of changes.

Copilot uses AI. Check for mistakes.
Comment on lines +82 to +98
// GetOrCreateState retrieves or initializes state for a mount point.
// Uses fine-grained locking to avoid blocking other mount operations.
func (t *RecoverStateTracker) GetOrCreateState(mountPath string) *MountState {
t.mu.Lock()
defer t.mu.Unlock()

if state, exists := t.states[mountPath]; exists {
return state
}

state := &MountState{
CurrentBackoff: initialBackoff,
IsHealthy: true, // Assume healthy until proven otherwise
}
t.states[mountPath] = state
return state
}
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetOrCreateState method returns a pointer to the internal MountState, which is then accessed outside the lock. This creates a data race because the returned state can be modified by other goroutines (e.g., via RecordFailure, RecordSuccess, ShouldEmitEvent) while this code reads the IsHealthy field.

To fix this, consider either:

  1. Add a method like IsHealthy(mountPath string) bool that reads the field under lock protection, or
  2. Have RecordSuccess return a boolean indicating whether the state transitioned from unhealthy to healthy.

Copilot uses AI. Check for mistakes.
@cheyang cheyang requested a review from TrafalgarZZZ February 4, 2026 02:15
@codecov
Copy link

codecov bot commented Feb 4, 2026

Codecov Report

❌ Patch coverage is 84.49612% with 20 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.43%. Comparing base (ff0e62d) to head (50399d2).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
pkg/csi/recover/recover.go 54.28% 14 Missing and 2 partials ⚠️
pkg/csi/recover/state.go 95.74% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5662      +/-   ##
==========================================
+ Coverage   59.32%   59.43%   +0.10%     
==========================================
  Files         444      445       +1     
  Lines       30540    30664     +124     
==========================================
+ Hits        18119    18225     +106     
- Misses      10917    10932      +15     
- Partials     1504     1507       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CSI recover loop may spam events and overload API server on persistent mount failures

1 participant