Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions paasta_tools/kubernetes/application/controller_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from paasta_tools.kubernetes_tools import create_deployment
from paasta_tools.kubernetes_tools import create_job
from paasta_tools.kubernetes_tools import create_pod_disruption_budget
from paasta_tools.kubernetes_tools import create_service
from paasta_tools.kubernetes_tools import create_stateful_set
from paasta_tools.kubernetes_tools import delete_service
from paasta_tools.kubernetes_tools import ensure_service_account
from paasta_tools.kubernetes_tools import HpaOverride
from paasta_tools.kubernetes_tools import KubeClient
Expand All @@ -26,6 +28,7 @@
from paasta_tools.kubernetes_tools import paasta_prefixed
from paasta_tools.kubernetes_tools import pod_disruption_budget_for_service_instance
from paasta_tools.kubernetes_tools import update_deployment
from paasta_tools.kubernetes_tools import update_service
from paasta_tools.kubernetes_tools import update_stateful_set
from paasta_tools.utils import load_system_paasta_config

Expand Down Expand Up @@ -297,6 +300,7 @@ def create(self, kube_client: KubeClient) -> None:
)
self.ensure_pod_disruption_budget(kube_client, self.soa_config.get_namespace())
self.sync_horizontal_pod_autoscaler(kube_client)
self.sync_service(kube_client)

def update(self, kube_client: KubeClient) -> None:
# If HPA is enabled, do not update replicas.
Expand All @@ -310,6 +314,7 @@ def update(self, kube_client: KubeClient) -> None:
def update_related_api_objects(self, kube_client: KubeClient) -> None:
super().update_related_api_objects(kube_client)
self.sync_horizontal_pod_autoscaler(kube_client)
self.sync_service(kube_client)

def sync_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
"""
Expand Down Expand Up @@ -401,6 +406,47 @@ def delete_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
)
)

def sync_service(self, kube_client: KubeClient) -> None:
"""Create or update the Kubernetes Service if configured."""
formatted_service = self.soa_config.format_kubernetes_service()
service_name = self.soa_config.get_kubernetes_service_name()
namespace = self.soa_config.get_namespace()

service_exists = self.exists_service(kube_client, service_name, namespace)

if formatted_service is None:
# If config removed, but service exists, delete it
if service_exists:
self.logging.info(
f"Deleting Service for {service_name}/name in {namespace}/namespace"
)
delete_service(kube_client, service_name, namespace)
return

if not service_exists:
self.logging.info(
f"Creating Service for {service_name}/name in {namespace}/namespace"
)
create_service(kube_client, formatted_service, namespace)
else:
self.logging.info(
f"Updating Service for {service_name}/name in {namespace}/namespace"
)
update_service(kube_client, formatted_service, namespace)

def exists_service(
self, kube_client: KubeClient, service_name: str, namespace: str
) -> bool:
try:
kube_client.core.read_namespaced_service(
name=service_name, namespace=namespace
)
return True
except ApiException as e:
if e.status == 404:
return False
raise


class StatefulSetWrapper(Application):
def deep_delete(self, kube_client: KubeClient) -> None:
Expand Down
101 changes: 101 additions & 0 deletions paasta_tools/kubernetes_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@
from kubernetes.client import V1SecretKeySelector
from kubernetes.client import V1SecretVolumeSource
from kubernetes.client import V1SecurityContext
from kubernetes.client import V1Service
from kubernetes.client import V1ServiceAccount
from kubernetes.client import V1ServiceAccountTokenProjection
from kubernetes.client import V1ServicePort
from kubernetes.client import V1ServiceSpec
from kubernetes.client import V1StatefulSet
from kubernetes.client import V1StatefulSetSpec
from kubernetes.client import V1TCPSocketAction
Expand Down Expand Up @@ -429,6 +432,12 @@ class NodeSelectorsPreferredConfigDict(TypedDict):
preferences: Dict[str, NodeSelectorConfig]


class KubernetesServiceConfigDict(TypedDict, total=False):
headless: bool # Shortcut for cluster_ip: None
annotations: Dict[str, str]
port: int # Optional - defaults to 8888


class KubernetesDeploymentConfigDict(LongRunningServiceConfigDict, total=False):
bounce_method: str
bounce_health_params: Dict[str, Any]
Expand All @@ -452,6 +461,7 @@ class KubernetesDeploymentConfigDict(LongRunningServiceConfigDict, total=False):
datastore_credentials: DatastoreCredentialsConfig
topology_spread_constraints: List[TopologySpreadConstraintDict]
enable_aws_lb_readiness_gate: bool
k8s_service: KubernetesServiceConfigDict


def load_kubernetes_service_config_no_cache(
Expand Down Expand Up @@ -2749,6 +2759,63 @@ def get_projected_sa_volumes(self) -> List[ProjectedSAVolume]:
soa_dir=self.soa_dir,
)

def get_kubernetes_service_config(self) -> Optional[KubernetesServiceConfigDict]:
"""Returns the k8s_service config block, or None if not configured."""
return self.config_dict.get("k8s_service")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def get_kubernetes_service_name(self) -> str:
"""Returns the name for the Kubernetes Service.

Currently matches the deployment name, but can be extended later
to support custom service naming if needed.
"""
return self.get_sanitised_deployment_name()

def format_kubernetes_service(self) -> Optional[V1Service]:
"""Create a Kubernetes Service configuration from this config.

Returns None if k8s_service is not configured.
"""
k8s_service_config = self.get_kubernetes_service_config()
if k8s_service_config is None:
return None

port = k8s_service_config.get("port", 8888) # Default to the PaaSTA port
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could probably default to self.get_container_port() to handle the couple of services that don't set this to DEFAULT_CONTAINER_PORT (otherwise, probably worth replacing the hardcoded 8888 here with

DEFAULT_CONTAINER_PORT = 8888

annotations = k8s_service_config.get("annotations", {})
annotations[paasta_prefixed("managed")] = "true"
headless = k8s_service_config.get("headless", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever a situation where we wouldn't want this to be true? I don't think we'd ever want a client to be connecting to a paasta service/instance using a k8s cluster ip - we'd only ever want to be using a loadbalancer/cloudmap/something else. I'd probably just remove this config option

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I agree, I think we can remove this option for now. Thanks!


service_port = V1ServicePort(
name="http", port=port, target_port=port, protocol="TCP"
)

service_spec = V1ServiceSpec(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super premature but this coudl be a great stepping stone to support service NLBs in paasta. According to https://kubernetes-sigs.github.io/aws-load-balancer-controller/latest/guide/service/nlb/ the only additions we would need are:

  • in the spec:
    • type should be settable to LoadBalancer
    • a new directive LoadBalancerClass would need to be set to service.k8s.aws/nlb

there is a third requirement which is to set an annotation of service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip but the current annotation support added in this PR would be enough

an alternative would otherwise to have a top level yelpsoa-config directive enable_nlb: True taht would set all of this in one go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea, and it sounds like a small change to support it. I will wait to see what other reviewers think before making any changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is really great idea!

ports=[service_port],
selector={
paasta_prefixed("service"): self.get_service(),
paasta_prefixed("instance"): self.get_instance(),
},
)
if headless:
service_spec.cluster_ip = "None"

service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(
name=self.get_kubernetes_service_name(),
namespace=self.get_namespace(),
labels={
paasta_prefixed("service"): self.get_service(),
paasta_prefixed("instance"): self.get_instance(),
paasta_prefixed("managed"): "true",
},
annotations=annotations,
),
spec=service_spec,
)
return service


def get_kubernetes_secret_hashes(
environment_variables: Mapping[str, str], service: str, namespace: str
Expand Down Expand Up @@ -3866,6 +3933,40 @@ def create_job(
)


def create_service(
kube_client: KubeClient,
formatted_service: V1Service,
namespace: str,
) -> None:
return kube_client.core.create_namespaced_service(
namespace=namespace,
body=formatted_service,
)


def update_service(
kube_client: KubeClient,
formatted_service: V1Service,
namespace: str,
) -> None:
return kube_client.core.replace_namespaced_service(
name=formatted_service.metadata.name,
namespace=namespace,
body=formatted_service,
)


def delete_service(
kube_client: KubeClient,
service_name: str,
namespace: str,
) -> None:
return kube_client.core.delete_namespaced_service(
name=service_name,
namespace=namespace,
)


def get_event_timestamp(event: CoreV1Event) -> Optional[float]:
# Cycle through timestamp attributes in order of preference
for ts_attr in ["last_timestamp", "event_time", "first_timestamp"]:
Expand Down
64 changes: 64 additions & 0 deletions tests/kubernetes/application/test_controller_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,67 @@ def test_job_wrapper_deep_delete():
"mock_namespace",
body=V1DeleteOptions(propagation_policy="Foreground"),
)


def test_sync_service_creates_service_when_configured():
"""Test sync_service creates a Service when k8s_service is configured."""
mock_client = mock.MagicMock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was gonna say that we should be using a mock that's autospec'd on the kube client...but it seems like this is a pre-existing pattern already in this file so nevermind :p

config_dict = {
"k8s_service": {
"headless": True,
"port": 8888,
"annotations": {
"external-dns.alpha.kubernetes.io/hostname": "test1.example.com"
},
}
}
app = setup_app(config_dict, True)
app.exists_service = mock.Mock(return_value=False)
app.sync_service(kube_client=mock_client)
mock_client.core.create_namespaced_service.assert_called_once()
assert mock_client.core.replace_namespaced_service.call_count == 0
assert mock_client.core.delete_namespaced_service.call_count == 0


def test_sync_service_updates_existing_service():
"""Test sync_service updates a Service when it already exists."""
mock_client = mock.MagicMock()
config_dict = {
"k8s_service": {
"headless": True,
"port": 8889,
"annotations": {
"external-dns.alpha.kubernetes.io/hostname": "test2.example.com"
},
}
}
app = setup_app(config_dict, True)
app.exists_service = mock.Mock(return_value=True)
app.sync_service(kube_client=mock_client)
mock_client.core.replace_namespaced_service.assert_called_once()
assert mock_client.core.create_namespaced_service.call_count == 0
assert mock_client.core.delete_namespaced_service.call_count == 0


def test_sync_service_deletes_when_config_removed():
"""Test sync_service deletes Service when k8s_service config is removed."""
mock_client = mock.MagicMock()
config_dict = {}
app = setup_app(config_dict, True)
app.exists_service = mock.Mock(return_value=True)
app.sync_service(kube_client=mock_client)
mock_client.core.delete_namespaced_service.assert_called_once()
assert mock_client.core.create_namespaced_service.call_count == 0
assert mock_client.core.replace_namespaced_service.call_count == 0


def test_sync_service_noop_when_not_configured_and_doesnt_exist():
"""Test sync_service does nothing when not configured and doesn't exist."""
mock_client = mock.MagicMock()
config_dict = {}
app = setup_app(config_dict, True)
app.exists_service = mock.Mock(return_value=False)
app.sync_service(kube_client=mock_client)
assert mock_client.core.create_namespaced_service.call_count == 0
assert mock_client.core.replace_namespaced_service.call_count == 0
assert mock_client.core.delete_namespaced_service.call_count == 0
Loading