Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 117 additions & 32 deletions internal/controller/postgresuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

dbv1alpha1 "github.com/movetokube/postgres-operator/api/v1alpha1"
"github.com/movetokube/postgres-operator/pkg/config"
Expand Down Expand Up @@ -127,6 +132,13 @@ func (r *PostgresUserReconciler) Reconcile(ctx context.Context, req ctrl.Request
// We need to get the Postgres CR to get the group role name
database, err := r.getPostgresCR(ctx, instance)
if err != nil {
if errors.IsNotFound(err) {
// Referenced Postgres CR doesn't exist - log warning and don't requeue
// The user needs to either create the Postgres CR or delete this PostgresUser
reqLogger.Info("Referenced Postgres CR not found, skipping reconciliation",
"database", instance.Spec.Database)
return ctrl.Result{}, nil
}
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
// Create user role
Expand Down Expand Up @@ -211,44 +223,50 @@ func (r *PostgresUserReconciler) Reconcile(ctx context.Context, req ctrl.Request
// We need to get the Postgres CR to get the group role name
database, err := r.getPostgresCR(ctx, instance)
if err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
if !errors.IsNotFound(err) {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
// Referenced Postgres CR doesn't exist - log warning and skip privilege reconciliation
// The user needs to either create the Postgres CR or delete this PostgresUser
reqLogger.Info("Referenced Postgres CR not found, skipping privilege reconciliation",
"database", instance.Spec.Database)
} else {
// Determine desired group role
var desiredGroup string
switch instance.Spec.Privileges {
case "READ":
desiredGroup = database.Status.Roles.Reader
case "WRITE":
desiredGroup = database.Status.Roles.Writer
default:
desiredGroup = database.Status.Roles.Owner
}

// Determine desired group role
var desiredGroup string
switch instance.Spec.Privileges {
case "READ":
desiredGroup = database.Status.Roles.Reader
case "WRITE":
desiredGroup = database.Status.Roles.Writer
default:
desiredGroup = database.Status.Roles.Owner
}
// Ability user to be reassigned to another group role
currentGroup := instance.Status.PostgresGroup
if desiredGroup != "" && currentGroup != desiredGroup {

// Ability user to be reassigned to another group role
currentGroup := instance.Status.PostgresGroup
if desiredGroup != "" && currentGroup != desiredGroup {
// Remove the old group membership if present
if currentGroup != "" {
if err := r.pg.RevokeRole(currentGroup, role); err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
}

// Remove the old group membership if present
if currentGroup != "" {
if err := r.pg.RevokeRole(currentGroup, role); err != nil {
// Grant the new group role
if err := r.pg.GrantRole(desiredGroup, role); err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
}

// Grant the new group role
if err := r.pg.GrantRole(desiredGroup, role); err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}

// Ensure objects created by the user are owned by the new group
if err := r.pg.AlterDefaultLoginRole(role, desiredGroup); err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}
// Ensure objects created by the user are owned by the new group
if err := r.pg.AlterDefaultLoginRole(role, desiredGroup); err != nil {
return r.requeue(ctx, instance, errors.NewInternalError(err))
}

instance.Status.PostgresGroup = desiredGroup
if err := r.Status().Update(ctx, instance); err != nil {
return r.requeue(ctx, instance, err)
instance.Status.PostgresGroup = desiredGroup
if err := r.Status().Update(ctx, instance); err != nil {
return r.requeue(ctx, instance, err)
}
}
}
}
Expand Down Expand Up @@ -397,10 +415,16 @@ func (r *PostgresUserReconciler) addFinalizer(ctx context.Context, reqLogger log
return nil
}

func (r *PostgresUserReconciler) addOwnerRef(ctx context.Context, _ logr.Logger, instance *dbv1alpha1.PostgresUser) error {
func (r *PostgresUserReconciler) addOwnerRef(ctx context.Context, reqLogger logr.Logger, instance *dbv1alpha1.PostgresUser) error {
// Search postgres database CR
pg, err := r.getPostgresCR(ctx, instance)
if err != nil {
if errors.IsNotFound(err) {
// Referenced Postgres CR doesn't exist - skip setting owner reference
reqLogger.Info("Referenced Postgres CR not found, skipping owner reference",
"database", instance.Spec.Database)
return nil
}
return err
}
// Update owners
Expand Down Expand Up @@ -431,9 +455,70 @@ func (r *PostgresUserReconciler) finish(ctx context.Context, cr *dbv1alpha1.Post
return ctrl.Result{}, nil
}

// findPostgresUsersForPostgres returns reconcile requests for all PostgresUsers
// that reference the given Postgres CR by name within the same namespace.
// This enables eventual consistency when a Postgres CR is created after PostgresUsers
// that reference it.
func (r *PostgresUserReconciler) findPostgresUsersForPostgres(ctx context.Context, obj client.Object) []reconcile.Request {
postgres := obj.(*dbv1alpha1.Postgres)
logger := log.FromContext(ctx)

var userList dbv1alpha1.PostgresUserList
if err := r.List(ctx, &userList, client.InNamespace(postgres.Namespace)); err != nil {
logger.Error(err, "Failed to list PostgresUsers for Postgres CR", "postgres", postgres.Name)
return nil
}

var requests []reconcile.Request
for _, user := range userList.Items {
if user.Spec.Database == postgres.Name {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: user.Name,
Namespace: user.Namespace,
},
})
}
}

if len(requests) > 0 {
logger.Info("Enqueuing PostgresUsers for Postgres CR change",
"postgres", postgres.Name, "userCount", len(requests))
}
return requests
}

// SetupWithManager sets up the controller with the Manager.
// It watches PostgresUser CRs as the primary resource and also watches Postgres CRs
// to trigger reconciliation of PostgresUsers when their referenced Postgres CR changes.
// This ensures eventual consistency when PostgresUsers are created before their
// referenced Postgres CR exists.
func (r *PostgresUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dbv1alpha1.PostgresUser{}).
Watches(
&dbv1alpha1.Postgres{},
handler.EnqueueRequestsFromMapFunc(r.findPostgresUsersForPostgres),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// Trigger when Postgres CR is created and already succeeded
pg := e.Object.(*dbv1alpha1.Postgres)
return pg.Status.Succeeded
},
UpdateFunc: func(e event.UpdateEvent) bool {
// Trigger when Postgres CR transitions to succeeded state
oldPg := e.ObjectOld.(*dbv1alpha1.Postgres)
newPg := e.ObjectNew.(*dbv1alpha1.Postgres)
return !oldPg.Status.Succeeded && newPg.Status.Succeeded
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Trigger on deletion to allow cleanup of dependent resources
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}),
).
Complete(r)
}
118 changes: 115 additions & 3 deletions internal/controller/postgresuser_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ var _ = Describe("PostgresUser Controller", func() {
Expect(foundSecret.Data).To(HaveKey("PORT"))
})

It("should fail if the database does not exist", func() {
It("should not error and skip reconciliation if the database does not exist", func() {
// Delete the postgres DB
Expect(cl.Delete(ctx, postgresDB)).To(Succeed())

Expand All @@ -358,13 +358,17 @@ var _ = Describe("PostgresUser Controller", func() {
},
}
_, err := rp.Reconcile(ctx, req)
Expect(err).To(HaveOccurred())
// Should not return error to avoid infinite reconciliation loop
Expect(err).NotTo(HaveOccurred())

// Check status
// Check status - user should still exist but not be processed
foundUser := &dbv1alpha1.PostgresUser{}
err = cl.Get(ctx, types.NamespacedName{Name: "nonexistent-user", Namespace: namespace}, foundUser)
Expect(err).NotTo(HaveOccurred())
// Status.Succeeded should still be false since we didn't actually create the role
Expect(foundUser.Status.Succeeded).To(BeFalse())
// PostgresRole should be empty since we skipped creation
Expect(foundUser.Status.PostgresRole).To(BeEmpty())
})
})

Expand Down Expand Up @@ -779,4 +783,112 @@ var _ = Describe("PostgresUser Controller", func() {
Expect(secret.Name).To(Equal("mysecret3"))
})
})

Describe("Cross-resource watching", func() {
var (
postgresDB *dbv1alpha1.Postgres
postgresUser1 *dbv1alpha1.PostgresUser
postgresUser2 *dbv1alpha1.PostgresUser
)

BeforeEach(func() {
postgresDB = &dbv1alpha1.Postgres{
ObjectMeta: metav1.ObjectMeta{
Name: databaseName,
Namespace: namespace,
},
Spec: dbv1alpha1.PostgresSpec{
Database: databaseName,
},
Status: dbv1alpha1.PostgresStatus{
Succeeded: true,
Roles: dbv1alpha1.PostgresRoles{
Owner: databaseName + "-group",
Reader: databaseName + "-reader",
Writer: databaseName + "-writer",
},
},
}

postgresUser1 = &dbv1alpha1.PostgresUser{
ObjectMeta: metav1.ObjectMeta{
Name: "user-for-db",
Namespace: namespace,
},
Spec: dbv1alpha1.PostgresUserSpec{
Database: databaseName,
SecretName: secretName,
Role: roleName,
Privileges: "WRITE",
},
}

postgresUser2 = &dbv1alpha1.PostgresUser{
ObjectMeta: metav1.ObjectMeta{
Name: "user-for-other-db",
Namespace: namespace,
},
Spec: dbv1alpha1.PostgresUserSpec{
Database: "other-db",
SecretName: secretName,
Role: roleName,
Privileges: "READ",
},
}
})

Context("findPostgresUsersForPostgres mapping function", func() {
It("should return reconcile requests for PostgresUsers referencing the Postgres CR", func() {
// Create users first (without the Postgres CR)
Expect(cl.Create(ctx, postgresUser1)).To(Succeed())
Expect(cl.Create(ctx, postgresUser2)).To(Succeed())

// Call the mapping function
requests := rp.findPostgresUsersForPostgres(ctx, postgresDB)

// Should only return the user that references our database
Expect(requests).To(HaveLen(1))
Expect(requests[0].Name).To(Equal("user-for-db"))
Expect(requests[0].Namespace).To(Equal(namespace))
})

It("should return empty list when no PostgresUsers reference the Postgres CR", func() {
// Create a user that references a different database
Expect(cl.Create(ctx, postgresUser2)).To(Succeed())

// Call the mapping function
requests := rp.findPostgresUsersForPostgres(ctx, postgresDB)

// Should return empty list
Expect(requests).To(BeEmpty())
})

It("should return multiple requests when multiple PostgresUsers reference the same Postgres CR", func() {
// Create two users that reference the same database
Expect(cl.Create(ctx, postgresUser1)).To(Succeed())

anotherUser := &dbv1alpha1.PostgresUser{
ObjectMeta: metav1.ObjectMeta{
Name: "another-user-for-db",
Namespace: namespace,
},
Spec: dbv1alpha1.PostgresUserSpec{
Database: databaseName,
SecretName: "another-secret",
Role: "another-role",
Privileges: "READ",
},
}
Expect(cl.Create(ctx, anotherUser)).To(Succeed())

// Call the mapping function
requests := rp.findPostgresUsersForPostgres(ctx, postgresDB)

// Should return both users
Expect(requests).To(HaveLen(2))
names := []string{requests[0].Name, requests[1].Name}
Expect(names).To(ContainElements("user-for-db", "another-user-for-db"))
})
})
})
})