Add SkyPilot Kubernetes launcher backend for Tangle#223
Conversation
Adds SkyPilotKubernetesLauncher, a new ContainerTaskLauncher implementation that submits Tangle pipeline tasks as SkyPilot managed jobs. SkyPilot then handles container scheduling, multi-cluster / multi-cloud placement, multi-node coordination, preemption recovery, log streaming, and cancellation. Capabilities exercised that the existing kubernetes_launchers cannot do: - num_nodes > 16 (Tangle K8s launcher caps at 16) - Cross-cloud spot with auto-recovery (Tangle has GKE-only spot, no recovery) - Multi-cloud / multi-cluster placement (infra=None for any-cloud) - Warm-pool reuse via SkyPilot Pool (no Tangle equivalent) - s3://, https://, abfs:// file mounts (Tangle: HostPath + gcsfuse only) - First-class priority_class for Kueue (no annotation API in Tangle today) The new launcher is selected via TANGLE_LAUNCHER=skypilot env var; existing Kubernetes deployments are unaffected. skypilot is added as an optional dependency group ([skypilot]) so the install footprint is opt-in. Resource annotations are kept identical to kubernetes_launchers (cpu, memory, accelerators including JSON dict and SkyPilot string forms, ephemeral_storage, multi_node/number_of_nodes) so the same ComponentSpec runs against either backend. Multi-node dynamic-data inputs are bridged to bash env vars set from SKYPILOT_NUM_NODES / SKYPILOT_NODE_RANK / SKYPILOT_NODE_IPS. 20 tests pass against real cloud_pipelines_backend imports (sky.jobs SDK stubbed to keep tests offline). End-to-end pipeline dry-run example at examples/skypilot_launcher_dryrun.py exercises the full launch -> refresh -> log -> persist -> terminate lifecycle on a deliberately Tangle-impossible component spec (32 nodes, JSON-format accelerators, spot, Pool, S3 + GCS mixed file_mounts). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The launcher relies on sky.jobs.queue records returning start_at / end_at fields and on Resources(priority_class=...) which are stable in 0.12.1+. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reverts incidental whitespace/comment cleanup that crept into the previous commit. Keeps the launcher-selection change (the new _build_launcher() function and its single-line invocation in main()) and nothing else. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
E2E testing surfaced two cases where the launcher passed local-filesystem
paths (Tangle's LocalStorageProvider) directly to sky.Task.file_mounts,
which then fails sky's "must exist locally" validation with a generic error.
Fix:
* Inputs: raise LauncherError with an actionable message pointing the
user at configuring a cloud StorageProvider, instead of letting sky's
validation fire from inside file_mounts.
* Outputs: log a warning and skip the mount entirely when the URI is
a relative local path. The container can still write to /tmp/outputs/
inside the pod; the writes won't be persisted to Tangle's local
storage, but the job still runs and emits logs through sky.jobs.
Tested against a kind cluster with sky.jobs.launch dispatching a Hello
World pipeline — full lifecycle (submit -> RUNNING -> SUCCEEDED) verified
end-to-end. See examples/skypilot_launcher_dryrun.py for the offline
counterpart.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new tests covering the behavior introduced by the previous fix commit (58b4983), surfaced during end-to-end testing on a real Kubernetes cluster with Tangle's LocalStorageProvider: * test_input_local_uri_raises_actionable_error — non-cloud input URI raises LauncherError with a message pointing at cloud StorageProvider config, instead of letting sky.Task validation fire a generic error. * test_output_local_uri_skipped_no_mount — non-cloud output URI is skipped with a warning. The container still runs (writes to its own /tmp/outputs/), the launcher just doesn't sync writes back to Tangle's local storage. 22/22 tests pass against real cloud_pipelines_backend imports. End-to-end runs verified on kind cluster: Hello (smoke): SUCCEEDED Failing (exit 7): FAILED Annotated: SUCCEEDED (priority_class, JSON accelerators) Cancellation: CANCELLED (sky.jobs.cancel triggered, status flowed back) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
End-to-end testing on a kind cluster surfaced an architectural mismatch
between Tangle's LocalStorageProvider and SkyPilot's file_mounts model:
Tangle's local provider uses HostPath volumes (bidirectional), but
file_mounts is one-way upload + cloud-bucket mount, so a SkyPilot pod
cannot write back to the orchestrator's local filesystem.
Practical effect: single-component pipelines run fine on any provider
(stdout is captured by SkyPilot regardless), but multi-component graph
pipelines need a cloud StorageProvider so output->input artifact URIs
are gs://, s3://, abfs:// etc. that SkyPilot can mount on both sides.
Adds:
* Module docstring section explicitly documenting the requirement and
showing the GoogleCloudStorageProvider configuration recipe.
* test_multistep_with_cloud_uris_passes_through — verifies the launcher
correctly mounts both upstream output and downstream input as cloud
URIs through SkyPilot file_mounts.
23/23 tests pass.
Verified end-to-end on kind cluster with single-step pipelines:
Hello (smoke): SUCCEEDED
Failing (exit 7): FAILED (status flowed back via SkyPilotLaunchedJob)
Annotated: SUCCEEDED (priority_class, JSON accelerators)
Cancellation: CANCELLED (sky.jobs.cancel triggered)
Multi-step on LocalStorageProvider fails as expected (step 1 SUCCEEDED in
sky but the local output URI is skipped, so step 2 has no upstream input).
Live verification of multi-step with GCS pending bucket access.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Surfaced while running Tangle's two-step pipeline against a real GKE cluster with the API server deployed via the SkyPilot helm chart in consolidation mode: 1. sky.jobs.launch return shape: nightly returns tuple[Optional[int], Handle], older versions tuple[List[int], Handle]. Handle both shapes when extracting the job id. 2. Cloud-URI file_mounts under consolidation mode: constructing sky.Task() directly with cloud-URI file_mounts goes through translate_local_file_mounts_to_two_hop, which rejects them with NotSupportedError when no managed-jobs bucket is configured. Build a YAML-shaped dict and use sky.Task.from_yaml_config(); the YAML parser auto-promotes cloud-URI entries to sky.Storage MOUNT mounts, which is the path that works in consolidation mode. 3. SkyPilot MOUNT mode rejects sub-paths: gcsfuse mounts a bucket root, not an individual object. Mount each unique bucket once at /mnt/skypilot/<scheme>/<bucket> and reference sub-paths within. This also dedupes when several inputs/outputs live in the same bucket. 4. sky.jobs.queue return shape and filtering: nightly returns tuple[list[dict], ...] and ignores job_ids=. Unwrap the tuple and filter records by job_id locally. Tests updated to mirror the new internal shape. Verified end-to-end: jobs 11 (generate) and 12 (shout) SUCCEEDED on GKE with output flowing through gs://tangle-skypilot-test-zhwu.
A live test that exercises the launcher's multi-node path:
- One Tangle ComponentSpec, TaskSpec annotated with
tangleml.com/launchers/kubernetes/multi_node/number_of_nodes: 2.
- The launcher submits as a 2-pod SkyPilot managed job; the
TANGLE_MULTI_NODE_* env-var prelude bridges SkyPilot's runtime
values (SKYPILOT_NODE_RANK / NODE_IPS / NUM_NODES) into the
container.
- Rank 1 opens a TCP connection to rank 0 (using the peer-0 address
from the prelude) to prove peer addressing is reachable, not just
exported.
Verified on GKE in consolidation mode (sky job 16, 17s job duration):
both pods came up, rank 1 connected to rank 0:12321, and rank 0 wrote
the output report to GCS.
Annotations live on TaskSpec (not ComponentSpec.metadata) because
orchestrator_sql.py forwards task_spec.annotations only.
… show them
Without this, Tangle's /api/executions/{id}/container_log endpoint
reads from container_execution.log_uri (a cloud-storage URI written
by upload_log()) and gets nothing, because the previous upload_log()
was a no-op. The UI's "Logs" tab is therefore empty for SkyPilot-
launched containers — even though `sky jobs logs <id>` works fine
out-of-band.
Two pieces:
1. Wire a `storage_provider` arg through the launcher and
SkyPilotLaunchedJob (mirrors how kubernetes_launchers takes one).
Defaults to None — without it, upload_log() stays a no-op (sky
logs still work via the CLI/SDK; only the UI mirror is skipped).
2. get_log() retries with exponential backoff (6 attempts, 2-8s
spacing) when sky.jobs.tail_logs returns just the
"Job N is already in terminal state ..." hint. Sky returns this
message when called immediately after job termination, before
the controller has finalized the user-job log file. Once the file
is visible (~5-30s later), retry succeeds and the actual
stdout/stderr is uploaded to log_uri.
Verified end-to-end (sky job 21, multinode test): the Tangle UI's
container_log endpoint now returns the full sky log — both ranks'
output, peer IPs, and the rank-1 -> rank-0 socket exchange — instead
of a one-line terminal-state hint.
…ot-" Encodes both the orchestrator (Tangle) and the launcher (SkyPilot) in the managed-job name. Surfaces in: - SkyPilot dashboard / `sky jobs queue` (NAME column) - Tangle UI's container_state debug pane (`skypilot.job_name`) Makes it obvious at a glance which launcher ran a given task without having to inspect debug_info keys.
…-skypilot-" Keeping launcher code minimal — naming surfaces are demonstrated via the example pipeline name itself, not by changing the default prefix. This reverts commit e094684e87123ed5c3c5d4cd02c0ab9a3afd2128.
…ot-' Makes 'skypilot' visible in the Tangle UI's task name and the SkyPilot dashboard's job name without touching launcher defaults.
…lot-' too Surfaces 'skypilot' in the Tangle Pipelines list (run names) in addition to the inner task name. Makes which launcher ran the pipeline obvious from the index page without drilling into the run detail.
…angle library Run once after starting Tangle to make 'SkyPilot: GPU Sanity Check' and 'SkyPilot: Multi-node Peer Check' visible in the UI's component picker when building a pipeline. The 'SkyPilot:' name prefix surfaces the launcher in the component browser without changing any Tangle defaults.
…orch DDP
Both the example pipeline and the published-component swap the
synthetic socket roundtrip for a real multi-node PyTorch
DistributedDataParallel training run on a small MLP with synthetic
regression data.
Two pods × 1 H100 each (annotation: H100:1, num_nodes=2). NCCL
backend; gradients all-reduced across ranks every step. Drops the
storage_mount/output declarations so worker pods don't need
storage-side credentials — the launcher's log_uri pipeline already
captures stdout and serves it via Tangle's /container_log endpoint.
Verified on CoreWeave's sky-dev cluster (sky job 29):
loss 3.46 -> 0.08 -> 0.057 -> 0.025 -> 0.026 over 5 epochs
total ~14s after image pull
device=cuda gpu="NVIDIA H100 80GB HBM3" on both ranks, identical
all-reduced losses (DDP sync confirmed)
Falls back to gloo (CPU) automatically when no GPU is present, so the
same component runs against either accelerator profile.
… to GCS
Restores the checkpoint + training_log outputs on the multinode example
now that worker pods have GCS auth via SkyPilot's gcpCredentials helm
option (mounts a GCP service-account key + GOOGLE_APPLICATION_CREDENTIALS
into every job pod). gcsfuse on workers can now write to gs:// paths.
Verified on CoreWeave sky-dev (sky job 30):
- 2x[H100:1], NCCL, loss 3.46 -> 0.026 over 5 epochs, ~18s
- rank 0 wrote 1.51 MiB checkpoint + training log to
gs://tangle-skypilot-test-zhwu/artifacts/by_execution/<exec_id>/outputs/
The published "SkyPilot: Multi-node PyTorch DDP" component spec also
declares the outputs and writes them, so any pipeline built around it
in the Tangle UI persists artifacts the same way.
Pipeline graph:
prepare (CPU) -> infer_h100 (H100:1) + infer_h200 (H200:1) -> compare (CPU)
The two inference tasks declare different accelerator constraints, so
SkyPilot's optimizer dispatches them to *different* K8s contexts in the
same allowed_contexts list — proving cross-cluster placement under one
Tangle pipeline. Both run an identical gpt2 generation script over the
same 3 prompts; the final task fans both result JSON files together
into a side-by-side report.
Verified on CoreWeave (allowed_contexts=[sky-dev, h200cluster],
launcher infra=None so the optimizer picks per task):
- sky job 32 (infer_h100) -> NVIDIA H100 80GB HBM3 on sky-dev
- sky job 33 (infer_h200) -> NVIDIA H200 on h200cluster
- identical gpt2 completions on both, ~10-15% latency advantage on H200
- artifact handoff via gs://tangle-skypilot-test-zhwu (storage_mounts
work on both clusters via the GCP SA key mounted by the helm chart's
gcpCredentials option)
Tangle's stock kubernetes_launchers can only target a single api_client
context at a time, so this graph is not expressible there without
running two separate orchestrators.
…ebius-h100 and switch model to Qwen2.5-0.5B-Instruct
The pipeline now reads as a multi-cloud demo: each inference task name
encodes the target cloud + accelerator (gke-l4, nebius-h100) so the
placement is obvious in the Tangle UI and SkyPilot dashboard. The
underlying GPU asks (H100, H200) keep the example runnable in the
current allowed_contexts (sky-dev + h200cluster); comments mark the
spots to swap to L4:1 / H100:1 once a real GKE-L4 and Nebius-H100
context are added.
Switched the model from gpt2 to Qwen/Qwen2.5-0.5B-Instruct — same small
footprint (~1GB), but uses the chat template so completions read as
real assistant responses instead of the gpt2 loop-y output. Output
schema also includes the model id per result.
Verified on CoreWeave (sky jobs 41-43, run 019ddade7817bada7422):
prompt: 'The capital of France is'
gke-l4 (NVIDIA H100, 436ms): 'Paris.'
nebius-h100 (NVIDIA H200, 401ms): 'Paris.'
prompt: 'A haiku about distributed computing:'
both: 'In parallel threads of thought,\\nTogether we solve problems
vast and small,\\nEfficiency in every task achieved.'
Same prompt, identical greedy completions on both clusters; ~7-15%
latency advantage on H200. Cross-cluster placement is sky's optimizer
working off the per-task accelerator constraint, not any per-task
infra pinning in the launcher.
…lot launcher + UI
Mirrors upstream start_local.py but swaps DockerContainerLauncher for
SkyPilotKubernetesLauncher and uses GoogleCloudStorageProvider when
TANGLE_STORAGE_BUCKET is set (LocalStorageProvider otherwise). Serves
the same Tangle frontend on /, so users get the full UI + the
SkyPilot launcher in one entry point.
Run:
TANGLE_STORAGE_BUCKET=gs://my-bucket python -m uvicorn \
start_local_skypilot:app --host 0.0.0.0 --port 9091
|
Thank you a lot, Zhanghao! I'll review this PR soon. |
| from cloud_pipelines.orchestration.storage_providers import google_cloud_storage | ||
| storage_provider = google_cloud_storage.GoogleCloudStorageProvider() | ||
| bucket_uri = storage_bucket.rstrip("/") | ||
| if not bucket_uri.startswith("gs://"): |
There was a problem hiding this comment.
We should probably require the URI schema so that we can deduce the storage kind.
| default_image: Optional[str] = None, | ||
| default_labels: Optional[dict[str, str]] = None, | ||
| default_envs: Optional[dict[str, str]] = None, | ||
| annotation_to_label_keys: Optional[list[str]] = None, |
There was a problem hiding this comment.
Probably better to have it as a map from Tangle annotations to labels.
Speaking of which, you might want a way to map to Kubernetes annotations too.
| specify one. | ||
| default_labels: Labels applied to every Sky resource (propagated to | ||
| K8s pod labels under the kubernetes infra). | ||
| default_envs: Env vars injected into every container. |
There was a problem hiding this comment.
I do not have a strong opinion here. But this might create compatibility issues: The components that rely on extra env variables that are not specified in component or container are not portable.
| for s in ("gs://", "s3://", "abfs://", "https://", "http://", "r2://") | ||
| ) | ||
|
|
||
| # SkyPilot's MOUNT mode requires the source to be a bucket root (not a |
There was a problem hiding this comment.
SkyPilot's MOUNT mode requires the source to be a bucket root
One problem with this is that it allows faulty user component code to modify or delete all data written by other component task executions.
| ``r2://``, ``https://``) directly into the container but cannot represent the | ||
| relative-local-path artifact URIs produced by Tangle's | ||
| ``LocalStorageProvider``. |
There was a problem hiding this comment.
It's OK for a launcher to only support a subset of storage providers/URIs and raise an exception if unsupported URI is encountered. It's understandable that most launchers won't support the LocalStorageProvider.
P.S. Technically, it's possible to implement (via upload/download data passing instead of mounting). Some clouds that lack good native storage capabilities (e.g. vast.ai) might need that.
| # The YAML parser auto-promotes cloud-URI entries in file_mounts into | ||
| # sky.Storage MOUNT mounts (sky/task.py:660-688), which is the path that | ||
| # works under consolidation mode. Constructing sky.Task() directly with | ||
| # cloud-URI file_mounts goes through translate_local_file_mounts_to_two_hop | ||
| # which rejects them. |
| def exit_code(self) -> Optional[int]: | ||
| if not self.has_ended: | ||
| return None | ||
| return 0 if self.has_succeeded else 1 |
There was a problem hiding this comment.
Are there plans to make the real exit code available in the future?
| job_name = task.name or self._job_name_prefix + "task" | ||
|
|
||
| # Submit. sky.jobs.launch returns a RequestId; await it via sky.get(). | ||
| # Result shape changed across sky versions: older returns (List[int], Handle), |
There was a problem hiding this comment.
Is the shape controlled by the Skypilot Python package version of the version of Skypilot used in the cluster?
| finally: | ||
| finished.set() | ||
|
|
||
| thread = threading.Thread(target=_run, daemon=True) |
There was a problem hiding this comment.
I'm a bit concerned about starting new threads in response to the stream_log_lines call.
Is it possible to use sky_jobs.tail_logs(..., preload_content=True)? Or is there some issue making that infeasible?
There was a problem hiding this comment.
Let's group the examples under a skypilot subdirectory.
Ark-kun
left a comment
There was a problem hiding this comment.
Thank you a lot for this very impressive work!
Apart from a couple of small requests (grouping the examples into skypilot dir; taking a second look at the log streaming), this PR is good to go.
|
@Michaelvll
|
yuechao-qin
left a comment
There was a problem hiding this comment.
Summary of Tests
uv run pytest tests/test_skypilot_launchers.py -v— 23/23 passeduv run python examples/publish_skypilot_components.py— published 2 components successfully- Verified published components via
GET /api/published_components/— returned both digests - Verified full component spec via
GET /api/components/{digest}— YAML matches source uv run python examples/skypilot_launcher_dryrun.py— failed (missingfrom_yaml_configon stub)
| _SKY_TERMINAL_STATE_HINT = "is already in terminal state" | ||
|
|
||
| def get_log(self) -> str: | ||
| import time as _time |
There was a problem hiding this comment.
Curious, why a lazy import of time?
| try: | ||
| return max(1, int(float(s))) | ||
| except ValueError: | ||
| return 8 |
There was a problem hiding this comment.
Should there be a warning log to let user's know user's know it defaulted to 8 GiB?
| ): | ||
| """Launches Tangle container tasks via SkyPilot managed jobs. | ||
|
|
||
| Designed for Kubernetes-only deployments (the Shopify use case) but works |
There was a problem hiding this comment.
Should "Shopify" be/(need to be) mentioned?
| if self._pool is not None: | ||
| launch_kwargs["pool"] = self._pool | ||
| request_id = sky_jobs.launch(task, name=job_name, **launch_kwargs) | ||
| result = sky.get(request_id) |
There was a problem hiding this comment.
K8 launchers have timeouts per request (here), is this possible with sky.get too?
| Run: | ||
| /home/sky/.venv/bin/python examples/run_pipeline_dryrun.py | ||
|
|
There was a problem hiding this comment.
Not sure if I'm doing something wrong here.
Dry-run example broken — _FakeTask missing from_yaml_config.
_build_task() calls sky.Task.from_yaml_config() (skypilot_launchers.py:572), but the stub in examples/skypilot_launcher_dryrun.py:35 doesn't implement it. The test stub (tests/test_skypilot_launchers.py:40) has it.
Repro:
uv sync --group dev --group skypilot
uv pip install -e .
uv run python examples/skypilot_launcher_dryrun.py
| # Mount the prebuilt frontend (cloned from TangleML/tangle-ui to ./ui_build). | ||
| this_dir = pathlib.Path(__file__).parent | ||
| web_app_search_dirs = [ | ||
| this_dir / "ui_build", | ||
| this_dir / ".." / "ui_build", | ||
| ] |
There was a problem hiding this comment.
Only checks ./ui_build and ../ui_build. Missing ../tangle-ui/dist which start_local.py (line 291) includes.
Result: Frontend build files not found; UI will not be available.
Fix: add this_dir / ".." / "tangle-ui" / "dist" to the search list.
| Run: | ||
| /home/sky/.venv/bin/uvicorn start_local_skypilot:app --host 0.0.0.0 --port 8000 | ||
|
|
||
| Or: | ||
| /home/sky/.venv/bin/python -m uvicorn start_local_skypilot:app --host 0.0.0.0 --port 8000 | ||
|
|
||
| The Tangle frontend (cloned to ./ui_build) is served at http://localhost:8000/. | ||
| Pipelines submitted from the UI will be dispatched through SkyPilot — they need | ||
| a configured infra (e.g. a Kubernetes context) to actually execute, but the UI | ||
| itself is fully browsable without one. |
There was a problem hiding this comment.
All example scripts hardcode BASE = "http://localhost:9091" but docstring says port 8000. Running server on 8000 → examples get Connection refused.
Affected files:
examples/publish_skypilot_components.py:18examples/multinode_pipeline_e2e.py:18examples/multicluster_inference_e2e.py:24
Suggestion: use os.environ.get("TANGLE_API_URL", "http://localhost:8000") so port is configurable and default matches the docstring.
A new
ContainerTaskLauncherthat submits Tangle tasks as SkyPilot managedjobs. SkyPilot then handles container scheduling, multi-cluster placement,
multi-node coordination, log mirroring, and cancellation.
Why
Adding SkyPilot as a launcher backend brings these capabilities to Tangle
pipelines without changing the ComponentSpec authoring model:
kubernetes.allowed_contexts; sky's optimizer picks per task based on accelerator/region. Lets a single Tangle pipeline fan out across clusters and clouds.Poolreuses pre-warmed pods instead of creating a fresh one per task, allowing batch inference jobs cutting cold-start from minutes to seconds.Storagemounts any of these in addition to GCS — no custom storage-provider code needed in Tangle.sky.jobs.tail_logsandsky.jobs.cancel, mirrored back into the Tangle UI's/container_log.Opt in:
Existing installs are unaffected;
skypilotis a new optional dependency group inpyproject.toml. Headless /orchestrator_main.pyusers can opt in withTANGLE_LAUNCHER=skypilot.Examples
23 unit tests pass with sky stubbed (
pytest tests/test_skypilot_launchers.py).Live runs targeted Nebius Kubernetes contexts + GCP access with the SkyPilot API server . After starting
start_local_skypilot.py, the runs below are onepython examples/<name>.pyaway.1. Multi-node PyTorch DDP
examples/multinode_pipeline_e2e.py. One ComponentSpec annotated withnumber_of_nodes: "2"+H100:1runs as a 2-pod NCCL job; rank 0 saves a checkpoint to GCS. SUCCEEDED on 2× NVIDIA H100; loss converged from 3.46 →0.026 over 5 epochs with identical all-reduced losses across ranks.
Walkthrough:
tangle-skypilot-multi-node-training.mp4
2. Inference across multiple clusters
examples/multicluster_inference_e2e.py. Four-task graph; one inferencetask asks for H100, the other for H200. Sky's optimizer dispatches them to
different K8s contexts in the same pipeline. Same Qwen2.5-0.5B-Instruct
script runs on both, identical greedy completions, a final compare task
fans them together.
Walkthrough:
tangle-skypilot-cross-cluster-inference.mp4