diff --git a/.changelog/5139.txt b/.changelog/5139.txt new file mode 100644 index 0000000000..e51dd684cb --- /dev/null +++ b/.changelog/5139.txt @@ -0,0 +1,3 @@ +```release-note:bug +catalog-sync: Fix cross-node service deregistration when multiple K8s clusters sync to the same Consul datacenter. The syncer now only deregisters services on its own consul-node-name, preventing clusters from removing each other's service registrations. +``` diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 95a7b93dd1..b5655c53d1 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -369,6 +369,13 @@ func (s *ConsulSyncer) deregisterRemovedService(ctx context.Context, name, names defer s.lock.Unlock() for _, service := range services { + // Skip services registered on other nodes to avoid cross-node + // deregistration when multiple K8s clusters sync to the same + // Consul datacenter. + if service.Node != s.ConsulNodeName { + continue + } + // Make sure the namespace exists before we run checks against it if _, ok := s.serviceNames[namespace]; ok { // If the service is valid and its info isn't nil, we don't deregister it @@ -416,8 +423,14 @@ func (s *ConsulSyncer) scheduleReapServiceLocked(name, namespace string) error { return err } - // Create deregistrations for all of these + // Create deregistrations for all of these, but only for services + // on this syncer's node to avoid cross-node deregistration when + // multiple K8s clusters sync to the same Consul datacenter. for _, svc := range services { + if svc.Node != s.ConsulNodeName { + continue + } + s.deregs[svc.ServiceID] = &api.CatalogDeregistration{ Node: svc.Node, ServiceID: svc.ServiceID, diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index d6626a3195..8113f1b4c2 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -5,7 +5,6 @@ package catalog import ( "context" - "fmt" "net/http" "net/http/httptest" "net/url" @@ -68,62 +67,58 @@ func TestConsulSyncer_register(t *testing.T) { } // Test that the syncer reaps individual invalid service instances. +// The syncer only reaps services on its own ConsulNodeName to avoid +// cross-node interference in multi-cluster deployments. func TestConsulSyncer_reapServiceInstance(t *testing.T) { t.Parallel() - for _, node := range []string{ConsulSyncNodeName, "test-node"} { - name := fmt.Sprintf("consul node name: %s", node) - t.Run(name, func(t *testing.T) { - // Set up server, client, syncer - testClient := test.TestServerWithMockConnMgrWatcher(t, nil) - client := testClient.APIClient + // Set up server, client, syncer + testClient := test.TestServerWithMockConnMgrWatcher(t, nil) + client := testClient.APIClient - s, closer := testConsulSyncer(testClient) - defer closer() + s, closer := testConsulSyncer(testClient) + defer closer() - // Sync - s.Sync([]*api.CatalogRegistration{ - testRegistration(node, "bar", "default"), - }) + // Sync + s.Sync([]*api.CatalogRegistration{ + testRegistration(ConsulSyncNodeName, "bar", "default"), + }) - // Wait for the first service - retry.Run(t, func(r *retry.R) { - services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil) - if err != nil { - r.Fatalf("err: %s", err) - } - if len(services) != 1 { - r.Fatal("service not found or too many") - } - }) + // Wait for the first service + retry.Run(t, func(r *retry.R) { + services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil) + if err != nil { + r.Fatalf("err: %s", err) + } + if len(services) != 1 { + r.Fatal("service not found or too many") + } + }) - // Create an invalid service directly in Consul - svc := testRegistration(node, "bar", "default") - svc.Service.ID = serviceID(node, "bar2") - fmt.Println("invalid service id", svc.Service.ID) - _, err := client.Catalog().Register(svc, nil) - require.NoError(t, err) - - // Valid service should exist - var service *api.CatalogService - retry.Run(t, func(r *retry.R) { - services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil) - if err != nil { - r.Fatalf("err: %s", err) - } - if len(services) != 1 { - r.Fatal("service not found or too many") - } - service = services[0] - }) + // Create an invalid service directly in Consul on the syncer's node. + svc := testRegistration(ConsulSyncNodeName, "bar", "default") + svc.Service.ID = serviceID(ConsulSyncNodeName, "bar2") + _, err := client.Catalog().Register(svc, nil) + require.NoError(t, err) - // Verify the settings - require.Equal(t, serviceID(node, "bar"), service.ServiceID) - require.Equal(t, node, service.Node) - require.Equal(t, "bar", service.ServiceName) - require.Equal(t, "127.0.0.1", service.Address) - }) - } + // Valid service should exist, invalid should be reaped. + var service *api.CatalogService + retry.Run(t, func(r *retry.R) { + services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil) + if err != nil { + r.Fatalf("err: %s", err) + } + if len(services) != 1 { + r.Fatal("service not found or too many") + } + service = services[0] + }) + + // Verify the settings + require.Equal(t, serviceID(ConsulSyncNodeName, "bar"), service.ServiceID) + require.Equal(t, ConsulSyncNodeName, service.Node) + require.Equal(t, "bar", service.ServiceName) + require.Equal(t, "127.0.0.1", service.Address) } // Test that the syncer reaps services not registered by us that are tagged @@ -171,6 +166,60 @@ func TestConsulSyncer_reapService(t *testing.T) { } } +// Test that the syncer does not deregister services registered on a different +// Consul node. This prevents multiple K8s clusters syncing to the same Consul +// datacenter from fighting over shared service names. +func TestConsulSyncer_doesNotReapServiceOnDifferentNode(t *testing.T) { + t.Parallel() + + testClient := test.TestServerWithMockConnMgrWatcher(t, nil) + client := testClient.APIClient + + s, closer := testConsulSyncer(testClient) + defer closer() + + // Sync a service on this syncer's node. + s.Sync([]*api.CatalogRegistration{ + testRegistration(ConsulSyncNodeName, "bar", "default"), + }) + + // Register the same service on a different node, simulating another + // K8s cluster's syncer. We expect this to NOT be reaped. + otherNode := "k8s-sync-other-cluster" + otherSvc := testRegistration(otherNode, "bar", "default") + _, err := client.Catalog().Register(otherSvc, nil) + require.NoError(t, err) + + // Also register an unrelated service on the other node to verify + // it is not reaped by scheduleReapServiceLocked either. + otherOnlySvc := testRegistration(otherNode, "other-only", "default") + _, err = client.Catalog().Register(otherOnlySvc, nil) + require.NoError(t, err) + + // Wait for the syncer to have run at least one full cycle. + retry.Run(t, func(r *retry.R) { + services, _, err := client.Catalog().Service("bar", TestConsulK8STag, nil) + require.NoError(r, err) + // We expect two instances: one on our node, one on the other. + require.Len(r, services, 2) + }) + + // Sleep to give the syncer extra cycles to potentially reap the + // other node's services (it shouldn't). + time.Sleep(500 * time.Millisecond) + + // The other node's "bar" instance should still exist. + barInstances, _, err := client.Catalog().Service("bar", TestConsulK8STag, nil) + require.NoError(t, err) + require.Len(t, barInstances, 2) + + // The other node's "other-only" service should still exist. + otherInstances, _, err := client.Catalog().Service("other-only", TestConsulK8STag, nil) + require.NoError(t, err) + require.Len(t, otherInstances, 1) + require.Equal(t, otherNode, otherInstances[0].Node) +} + // Test that the syncer doesn't reap any services until the initial sync has // been performed. func TestConsulSyncer_noReapingUntilInitialSync(t *testing.T) {