Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/5139.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
catalog-sync: Fix cross-node service deregistration when multiple K8s clusters sync to the same Consul datacenter. The syncer now only deregisters services on its own consul-node-name, preventing clusters from removing each other's service registrations.
```
15 changes: 14 additions & 1 deletion control-plane/catalog/to-consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ func (s *ConsulSyncer) deregisterRemovedService(ctx context.Context, name, names
defer s.lock.Unlock()

for _, service := range services {
// Skip services registered on other nodes to avoid cross-node
// deregistration when multiple K8s clusters sync to the same
// Consul datacenter.
if service.Node != s.ConsulNodeName {
continue
}

// Make sure the namespace exists before we run checks against it
if _, ok := s.serviceNames[namespace]; ok {
// If the service is valid and its info isn't nil, we don't deregister it
Expand Down Expand Up @@ -416,8 +423,14 @@ func (s *ConsulSyncer) scheduleReapServiceLocked(name, namespace string) error {
return err
}

// Create deregistrations for all of these
// Create deregistrations for all of these, but only for services
// on this syncer's node to avoid cross-node deregistration when
// multiple K8s clusters sync to the same Consul datacenter.
for _, svc := range services {
if svc.Node != s.ConsulNodeName {
continue
}

s.deregs[svc.ServiceID] = &api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Expand Down
147 changes: 98 additions & 49 deletions control-plane/catalog/to-consul/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package catalog

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -68,62 +67,58 @@ func TestConsulSyncer_register(t *testing.T) {
}

// Test that the syncer reaps individual invalid service instances.
// The syncer only reaps services on its own ConsulNodeName to avoid
// cross-node interference in multi-cluster deployments.
func TestConsulSyncer_reapServiceInstance(t *testing.T) {
t.Parallel()

for _, node := range []string{ConsulSyncNodeName, "test-node"} {
name := fmt.Sprintf("consul node name: %s", node)
t.Run(name, func(t *testing.T) {
// Set up server, client, syncer
testClient := test.TestServerWithMockConnMgrWatcher(t, nil)
client := testClient.APIClient
// Set up server, client, syncer
testClient := test.TestServerWithMockConnMgrWatcher(t, nil)
client := testClient.APIClient

s, closer := testConsulSyncer(testClient)
defer closer()
s, closer := testConsulSyncer(testClient)
defer closer()

// Sync
s.Sync([]*api.CatalogRegistration{
testRegistration(node, "bar", "default"),
})
// Sync
s.Sync([]*api.CatalogRegistration{
testRegistration(ConsulSyncNodeName, "bar", "default"),
})

// Wait for the first service
retry.Run(t, func(r *retry.R) {
services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil)
if err != nil {
r.Fatalf("err: %s", err)
}
if len(services) != 1 {
r.Fatal("service not found or too many")
}
})
// Wait for the first service
retry.Run(t, func(r *retry.R) {
services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil)
if err != nil {
r.Fatalf("err: %s", err)
}
if len(services) != 1 {
r.Fatal("service not found or too many")
}
})

// Create an invalid service directly in Consul
svc := testRegistration(node, "bar", "default")
svc.Service.ID = serviceID(node, "bar2")
fmt.Println("invalid service id", svc.Service.ID)
_, err := client.Catalog().Register(svc, nil)
require.NoError(t, err)

// Valid service should exist
var service *api.CatalogService
retry.Run(t, func(r *retry.R) {
services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil)
if err != nil {
r.Fatalf("err: %s", err)
}
if len(services) != 1 {
r.Fatal("service not found or too many")
}
service = services[0]
})
// Create an invalid service directly in Consul on the syncer's node.
svc := testRegistration(ConsulSyncNodeName, "bar", "default")
svc.Service.ID = serviceID(ConsulSyncNodeName, "bar2")
_, err := client.Catalog().Register(svc, nil)
require.NoError(t, err)

// Verify the settings
require.Equal(t, serviceID(node, "bar"), service.ServiceID)
require.Equal(t, node, service.Node)
require.Equal(t, "bar", service.ServiceName)
require.Equal(t, "127.0.0.1", service.Address)
})
}
// Valid service should exist, invalid should be reaped.
var service *api.CatalogService
retry.Run(t, func(r *retry.R) {
services, _, err := client.Catalog().Service("bar", s.ConsulK8STag, nil)
if err != nil {
r.Fatalf("err: %s", err)
}
if len(services) != 1 {
r.Fatal("service not found or too many")
}
service = services[0]
})

// Verify the settings
require.Equal(t, serviceID(ConsulSyncNodeName, "bar"), service.ServiceID)
require.Equal(t, ConsulSyncNodeName, service.Node)
require.Equal(t, "bar", service.ServiceName)
require.Equal(t, "127.0.0.1", service.Address)
}

// Test that the syncer reaps services not registered by us that are tagged
Expand Down Expand Up @@ -171,6 +166,60 @@ func TestConsulSyncer_reapService(t *testing.T) {
}
}

// Test that the syncer does not deregister services registered on a different
// Consul node. This prevents multiple K8s clusters syncing to the same Consul
// datacenter from fighting over shared service names.
func TestConsulSyncer_doesNotReapServiceOnDifferentNode(t *testing.T) {
t.Parallel()

testClient := test.TestServerWithMockConnMgrWatcher(t, nil)
client := testClient.APIClient

s, closer := testConsulSyncer(testClient)
defer closer()

// Sync a service on this syncer's node.
s.Sync([]*api.CatalogRegistration{
testRegistration(ConsulSyncNodeName, "bar", "default"),
})

// Register the same service on a different node, simulating another
// K8s cluster's syncer. We expect this to NOT be reaped.
otherNode := "k8s-sync-other-cluster"
otherSvc := testRegistration(otherNode, "bar", "default")
_, err := client.Catalog().Register(otherSvc, nil)
require.NoError(t, err)

// Also register an unrelated service on the other node to verify
// it is not reaped by scheduleReapServiceLocked either.
otherOnlySvc := testRegistration(otherNode, "other-only", "default")
_, err = client.Catalog().Register(otherOnlySvc, nil)
require.NoError(t, err)

// Wait for the syncer to have run at least one full cycle.
retry.Run(t, func(r *retry.R) {
services, _, err := client.Catalog().Service("bar", TestConsulK8STag, nil)
require.NoError(r, err)
// We expect two instances: one on our node, one on the other.
require.Len(r, services, 2)
})

// Sleep to give the syncer extra cycles to potentially reap the
// other node's services (it shouldn't).
time.Sleep(500 * time.Millisecond)

// The other node's "bar" instance should still exist.
barInstances, _, err := client.Catalog().Service("bar", TestConsulK8STag, nil)
require.NoError(t, err)
require.Len(t, barInstances, 2)

// The other node's "other-only" service should still exist.
otherInstances, _, err := client.Catalog().Service("other-only", TestConsulK8STag, nil)
require.NoError(t, err)
require.Len(t, otherInstances, 1)
require.Equal(t, otherNode, otherInstances[0].Node)
}

// Test that the syncer doesn't reap any services until the initial sync has
// been performed.
func TestConsulSyncer_noReapingUntilInitialSync(t *testing.T) {
Expand Down
Loading