Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions docs/book/component-guide/orchestrators/skypilot-vm.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description: Orchestrating your pipelines to run on VMs using SkyPilot.

# Skypilot VM Orchestrator

The SkyPilot VM Orchestrator is an integration provided by ZenML that allows you to provision and manage virtual machines (VMs) on any cloud provider supported by the [SkyPilot framework](https://skypilot.readthedocs.io/en/latest/index.html). This integration is designed to simplify the process of running machine learning workloads on the cloud, offering cost savings, high GPU availability, and managed execution, We recommend using the SkyPilot VM Orchestrator if you need access to GPUs for your workloads, but don't want to deal with the complexities of managing cloud infrastructure or expensive managed solutions.
The SkyPilot VM Orchestrator is an integration provided by ZenML that allows you to provision and manage virtual machines (VMs) on any cloud provider supported by the [SkyPilot framework](https://skypilot.readthedocs.io/en/latest/index.html). This integration is designed to simplify the process of running machine learning workloads on the cloud, offering cost savings, high GPU availability, and managed execution. We recommend using the SkyPilot VM Orchestrator if you need access to GPUs for your workloads, but don't want to deal with the complexities of managing cloud infrastructure or expensive managed solutions.

{% hint style="warning" %}
This component is only meant to be used within the context of a [remote ZenML deployment scenario](https://docs.zenml.io/getting-started/deploying-zenml/). Usage with a local ZenML deployment may lead to unexpected behavior!
Expand Down Expand Up @@ -35,6 +35,10 @@ The SkyPilot VM Orchestrator does not currently support the ability to [schedule
All ZenML pipeline runs are executed using Docker containers within the VMs provisioned by the orchestrator. For that reason, you may need to configure your pipeline settings with `docker_run_args=["--gpus=all"]` to enable GPU support in the Docker container.
{% endhint %}

{% hint style="info" %}
SkyPilot also ships an optional API server with a web dashboard. You can start it with `sky api start`, then run `sky api info` and open the `/dashboard` path of the returned base URL. ZenML does not integrate with this dashboard, but you can use it alongside the ZenML dashboard.
{% endhint %}

## How to deploy it

You don't need to do anything special to deploy the SkyPilot VM Orchestrator. As the SkyPilot integration itself takes care of provisioning VMs, you can simply use the orchestrator as you would any other ZenML orchestrator. However, you will need to ensure that you have the appropriate permissions to provision VMs on your cloud provider of choice and to configure your SkyPilot orchestrator accordingly using the [service connectors](https://docs.zenml.io/how-to/infrastructure-deployment/auth-management/service-connectors-guide) feature.
Expand Down Expand Up @@ -322,8 +326,11 @@ For additional configuration of the Skypilot orchestrator, you can pass `Setting
* `disk_tier`: The disk performance tier to use. If None, defaults to `'medium'`.
* `cluster_name`: Name of the cluster to create/reuse. If None, auto-generate a name. SkyPilot uses term `cluster` to refer to a group or a single VM that are provisioned to execute the task. The cluster name is used to identify the cluster and to determine whether to reuse an existing cluster or create a new one.
* `retry_until_up`: Whether to retry launching the cluster until it is up.
* `idle_minutes_to_autostop`: Automatically stop the cluster after this many minutes of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/running/pending jobs are found in the job queue. Setting this flag is equivalent to running `sky.launch(..., detach_run=True, ...)` and then `sky.autostop(idle_minutes=<minutes>)`. If not set, the cluster will not be autostopped.
* `idle_minutes_to_autostop`: Automatically stop the cluster after this many minutes of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/running/pending jobs are found in the job queue. Setting this flag schedules an autostop after the launch completes. If not set, the cluster will not be autostopped.
* `down`: Tear down the cluster after all jobs finish (successfully or abnormally). If `idle_minutes_to_autostop` is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes.
* `infra`: SkyPilot shortcut string describing resources (e.g., `"K80:1,CPU-8,Mem-32"`). Mutually exclusive with explicit `instance_type`/`cpus`/`memory`/`accelerators`.
* `num_nodes`: Number of nodes for multi-node jobs.
* `network_tier`: Network performance tier where supported by the cloud provider.
* `stream_logs`: If True, show the logs in the terminal as they are generated while the cluster is running.
* `docker_run_args`: Additional arguments to pass to the `docker run` command. For example, `['--gpus=all']` to use all GPUs available on the VM.
* `ports`: Ports to expose. Could be an integer, a range, or a list of integers and ranges. All ports will be exposed to the public internet.
Expand All @@ -332,8 +339,8 @@ For additional configuration of the Skypilot orchestrator, you can pass `Setting
* `ordered`: List of candidate resources to try in the specified order.
* `workdir`: Working directory on the local machine to sync to the VM. This is synced to `~/sky_workdir` inside the VM.
* `task_name`: Human-readable task name shown in SkyPilot for display purposes.
* `file_mounts`: File and storage mounts configuration to make local or cloud storage paths available inside the remote cluster.
* `envs`: Environment variables for the task. Accessible in the VMs that Skypilot launches, not in Docker continaers that the steps and pipeline is running on.
* `file_mounts`: File mounts configuration to make local or cloud storage paths available inside the remote cluster.
* `envs`: Environment variables for the task. Accessible in the VMs that Skypilot launches, not in Docker containers that the steps and pipeline is running on.
* `task_settings`: Dictionary of arbitrary settings forwarded to `sky.Task()`. This allows passing future parameters added by SkyPilot without requiring updates to ZenML.
* `resources_settings`: Dictionary of arbitrary settings forwarded to `sky.Resources()`. This allows passing future parameters added by SkyPilot without requiring updates to ZenML.
* `launch_settings`: Dictionary of arbitrary settings forwarded to `sky.launch()`. This allows passing future parameters added by SkyPilot without requiring updates to ZenML.
Expand Down Expand Up @@ -510,6 +517,7 @@ skypilot_settings = SkypilotKubernetesOrchestratorSettings(
{% endtab %}
{% endtabs %}


One of the key features of the SkyPilot VM Orchestrator is the ability to run each step of a pipeline on a separate VM with its own specific settings. This allows for fine-grained control over the resources allocated to each step, ensuring that each part of your pipeline has the necessary compute power while optimizing for cost and efficiency.

## Configuring Step-Specific Resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ class SkypilotBaseOrchestratorSettings(BaseSettings):
disk_size: the size of the OS disk in GiB.
disk_tier: the disk performance tier to use. If None, defaults to
``'medium'``.
network_tier: the network performance tier to use where supported by
the cloud provider.
infra: SkyPilot shortcut string describing resources (e.g.,
``"K80:1,CPU-8,Mem-32"``). Mutually exclusive with explicit
instance type, CPU, memory, or accelerator settings.
num_nodes: number of nodes for multi-node jobs.
ports: Ports to expose. Could be an integer, a range, or a list of
integers and ranges. All ports will be exposed to the public internet.
labels: Labels to apply to instances as key-value pairs. These are
Expand All @@ -84,10 +90,8 @@ class SkypilotBaseOrchestratorSettings(BaseSettings):
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
``sky.launch(..., detach_run=True, ...)`` and then
``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
will not be autostopped.
flag schedules an autostop after the launch completes. If not set,
the cluster will not be autostopped.
down: Tear down the cluster after all jobs finish (successfully or
abnormally). If --idle-minutes-to-autostop is also set, the
cluster will be torn down after the specified idle time.
Expand Down Expand Up @@ -136,6 +140,9 @@ class SkypilotBaseOrchestratorSettings(BaseSettings):
disk_tier: Optional[Literal["high", "medium", "low", "ultra", "best"]] = (
None
)
network_tier: Optional[str] = None
infra: Optional[str] = None
num_nodes: Optional[int] = None

# Run settings
cluster_name: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

import os
from abc import abstractmethod
from typing import TYPE_CHECKING, Dict, Optional, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast
from uuid import uuid4

import sky
from sky import StatusRefreshMode

from zenml.entrypoints import PipelineEntrypointConfiguration
from zenml.enums import StackComponentType
Expand Down Expand Up @@ -340,17 +339,15 @@ def submit_pipeline(
launch_new_cluster = True
if settings.cluster_name:
status_request_id = sky.status(
refresh=StatusRefreshMode.AUTO,
refresh=sky.StatusRefreshMode.AUTO,
cluster_names=[settings.cluster_name],
)
cluster_info = sky.stream_and_get(status_request_id)

if cluster_info:
cluster_status = sky.stream_and_get(status_request_id)
if cluster_status:
logger.info(
f"Found existing cluster {settings.cluster_name}. Reusing..."
)
launch_new_cluster = False

else:
logger.info(
f"Cluster {settings.cluster_name} not found. Launching a new one..."
Expand All @@ -374,48 +371,73 @@ def submit_pipeline(
logger.info(
f"Launching the task on a new cluster: {cluster_name}"
)
launch_job_id = sky.launch(
launch_request_id = sky.launch(
task,
cluster_name,
**launch_kwargs,
)
return sky_job_get(
launch_job_id, settings.stream_logs, cluster_name
launch_request_id, settings.stream_logs, cluster_name
)

else:
# Prepare exec parameters with additional launch settings
exec_kwargs = {
exec_kwargs: Dict[str, Any] = {
"down": down,
"backend": None,
**settings.launch_settings, # Can reuse same settings for exec
}

# Remove None values to avoid overriding SkyPilot defaults
exec_kwargs = {
k: v for k, v in exec_kwargs.items() if v is not None
}
for key, value in settings.launch_settings.items():
if key in {
"stream_logs",
"detach_setup",
"detach_run",
"num_nodes",
}:
continue
if value is not None:
exec_kwargs[key] = value

# Make sure the cluster is up
if settings.cluster_name is None:
raise ValueError(
"Cluster name is required when reusing an existing cluster."
)
start_request_id = sky.start(
settings.cluster_name,
down=down,
idle_minutes_to_autostop=idle_minutes_to_autostop,
retry_until_up=settings.retry_until_up,
)
sky.stream_and_get(start_request_id)
start_result = sky.stream_and_get(start_request_id)
if not start_result:
logger.warning(
"SkyPilot start returned an empty response for cluster "
f"{settings.cluster_name}."
)

status_request_id = sky.status(
refresh=sky.StatusRefreshMode.AUTO,
cluster_names=[settings.cluster_name],
)
cluster_status = sky.stream_and_get(status_request_id)
if not cluster_status:
raise RuntimeError(
f"Cluster {settings.cluster_name} was not found after start."
)

logger.info(
f"Executing the task on the cluster: {settings.cluster_name}"
)
exec_job_id = sky.exec(
exec_request_id = sky.exec(
task,
cluster_name=settings.cluster_name,
**exec_kwargs,
)
assert settings.cluster_name is not None
return sky_job_get(
exec_job_id, settings.stream_logs, settings.cluster_name
exec_request_id,
settings.stream_logs,
settings.cluster_name,
)

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,7 @@ def run_step_on_skypilot_vm(step_name: str) -> None:

task = task.set_resources(sky.Resources(**resources_kwargs))

# Prepare launch parameters
launch_kwargs = prepare_launch_kwargs(
settings=settings,
)
launch_kwargs = prepare_launch_kwargs(settings=settings)

# sky.launch now returns a request ID (async). Capture it so we can
# optionally stream logs and block until completion when desired.
Expand All @@ -258,7 +255,9 @@ def run_step_on_skypilot_vm(step_name: str) -> None:
cluster_name,
**launch_kwargs,
)
sky_job_get(launch_request_id, True, cluster_name)
result = sky_job_get(launch_request_id, True, cluster_name)
if result and result.wait_for_completion:
result.wait_for_completion()

# Pop the resource configuration for this step
unique_resource_configs.pop(step_name)
Expand All @@ -276,9 +275,15 @@ def run_step_on_skypilot_vm(step_name: str) -> None:
f"Resource configuration for cluster '{cluster_name}' "
"is not used by subsequent steps. deprovisioning the cluster."
)
down_request_id = sky.down(cluster_name)
# Wait for the cluster to be terminated
sky.stream_and_get(down_request_id)
try:
down_request_id = sky.down(cluster_name)
sky.stream_and_get(down_request_id)
except Exception as cleanup_error:
logger.warning(
"Failed to deprovision cluster "
f"'{cluster_name}': {cleanup_error}. "
"Resources may still be running."
)

logger.info(
f"Running step `{step_name}` on a VM is completed."
Expand Down
44 changes: 22 additions & 22 deletions src/zenml/integrations/skypilot/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Utility functions for Skypilot orchestrators."""

import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast

import sky

Expand Down Expand Up @@ -134,6 +134,7 @@ def prepare_task_kwargs(
"setup": setup,
"envs": merged_envs,
"name": settings.task_name or task_name,
"num_nodes": settings.num_nodes,
"workdir": settings.workdir,
"file_mounts_mapping": settings.file_mounts,
**settings.task_settings, # Add any arbitrary task settings
Expand Down Expand Up @@ -176,10 +177,12 @@ def prepare_resources_kwargs(
else settings.image_id,
"disk_size": settings.disk_size,
"disk_tier": settings.disk_tier,
"network_tier": settings.network_tier,
"ports": settings.ports,
"labels": settings.labels,
"any_of": settings.any_of,
"ordered": settings.ordered,
"infra": settings.infra,
**settings.resources_settings, # Add any arbitrary resource settings
}

Expand Down Expand Up @@ -210,11 +213,8 @@ def prepare_launch_kwargs(
else settings.idle_minutes_to_autostop
)

# The following parameters were removed from sky.launch in versions > 0.8.
# We therefore no longer include them in the kwargs passed to the call.
# • stream_logs – handled by explicitly calling sky.stream_and_get
# • detach_setup / detach_run – setup/run are now detached by default

# SkyPilot ≥0.11 made launch/exec async and removed stream/log flags.
# Keep only supported keys here; exec should use its own kwargs.
launch_kwargs = {
"retry_until_up": settings.retry_until_up,
"idle_minutes_to_autostop": idle_value,
Expand All @@ -223,7 +223,7 @@ def prepare_launch_kwargs(
**settings.launch_settings, # Keep user-provided extras
}

# Remove keys that are no longer supported by sky.launch.
# Remove keys no longer supported by sky.launch.
for _deprecated in (
"stream_logs",
"detach_setup",
Expand Down Expand Up @@ -254,24 +254,24 @@ def sky_job_get(
Returns:
Optional submission result.
"""
request_id_any = cast(Any, request_id)
if stream_logs:
# Stream logs and wait for completion
job_id, _ = sky.stream_and_get(request_id)
# Stream logs and wait for completion; returns (job_id, handle)
result = cast(Tuple[int, Any], sky.stream_and_get(request_id_any))
else:
# Just wait for completion without streaming logs
job_id, _ = sky.get(request_id)

_wait_for_completion = None
if stream_logs:
result = cast(Tuple[int, Any], sky.get(request_id_any))
job_id = result[0]

def _wait_for_completion() -> None:
status = 0 # 0=Successful, 100=Failed
status = sky.tail_logs(
cluster_name=cluster_name, job_id=job_id, follow=True
def _wait_for_completion() -> None:
status = sky.tail_logs(
cluster_name=cluster_name, job_id=job_id, follow=True
)
if status != 0:
raise Exception(
f"SkyPilot job {job_id} failed with status {status}"
)
if status != 0:
raise Exception(
f"SkyPilot job {job_id} failed with status {status}"
)

return SubmissionResult(wait_for_completion=_wait_for_completion)
return SubmissionResult(
wait_for_completion=_wait_for_completion if stream_logs else None
)
3 changes: 1 addition & 2 deletions src/zenml/integrations/skypilot_aws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class SkypilotAWSIntegration(Integration):
"""Definition of Skypilot AWS Integration for ZenML."""

NAME = SKYPILOT_AWS
# all 0.6.x versions of skypilot[aws] are compatible
REQUIREMENTS = ["skypilot[aws]==0.9.3"]
REQUIREMENTS = ["skypilot[aws]>=0.11.0,<0.12.0"]
APT_PACKAGES = ["openssh-client", "rsync"]

@classmethod
Expand Down
Loading
Loading