Skip to content

Commit 529efc9

Browse files
Merge pull request #523 from backend-developers-ltd/COM-485
Streaming support
2 parents 0586b52 + 06c11cd commit 529efc9

File tree

39 files changed

+812
-64
lines changed

39 files changed

+812
-64
lines changed

.github/workflows/integration_ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ jobs:
3131
run: |
3232
docker pull alpine
3333
docker pull backenddevelopersltd/compute-horde-executor:v1-latest
34+
docker pull backenddevelopersltd/compute-horde-streaming-job-test:v0-latest
3435
- name: Start all services
3536
run: local_stack/run_and_await_readiness.sh /tmp/integration_test_logs/
3637

compute_horde/compute_horde/fv_protocol/facilitator_requests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from compute_horde_core.executor_class import ExecutorClass
77
from compute_horde_core.output_upload import MultiUpload, OutputUpload
88
from compute_horde_core.signature import SignedFields, SignedRequest
9+
from compute_horde_core.streaming import StreamingDetails
910
from compute_horde_core.volume import MultiVolume, Volume
1011
from pydantic import BaseModel, JsonValue
1112

@@ -62,6 +63,8 @@ class V2JobRequest(SignedRequest, BaseModel, extra="forbid"):
6263
download_time_limit: int
6364
execution_time_limit: int
6465
upload_time_limit: int
66+
streaming_start_time_limit: int
67+
streaming_details: StreamingDetails | None = None
6568
# !!! all fields above are included in the signed json payload
6669

6770
def get_args(self):
@@ -100,6 +103,8 @@ def get_signed_fields(self) -> SignedFields:
100103
download_time_limit=self.download_time_limit,
101104
execution_time_limit=self.execution_time_limit,
102105
upload_time_limit=self.upload_time_limit,
106+
streaming_start_time_limit=self.streaming_start_time_limit,
107+
streaming_details=self.streaming_details,
103108
)
104109
return signed_fields
105110

compute_horde/compute_horde/fv_protocol/validator_requests.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,16 @@ class MinerResponse(BaseModel, extra="allow"):
4848
upload_results: dict[str, str] | None = None
4949

5050

51+
class StreamingServerDetails(BaseModel, extra="forbid"):
52+
streaming_server_cert: str | None = None
53+
streaming_server_address: str | None = None
54+
streaming_server_port: int | None = None
55+
56+
5157
class JobStatusMetadata(BaseModel, extra="allow"):
5258
comment: str
5359
miner_response: MinerResponse | None = None
60+
streaming_details: StreamingServerDetails | None = None
5461

5562

5663
class JobStatusUpdate(BaseModel, extra="forbid"):
@@ -67,6 +74,7 @@ class Status(StrEnum):
6774
COMPLETED = "completed"
6875
REJECTED = "rejected"
6976
FAILED = "failed"
77+
STREAMING_READY = "streaming_ready"
7078

7179
message_type: Literal["V0JobStatusUpdate"] = "V0JobStatusUpdate"
7280
uuid: str

compute_horde/compute_horde/miner_client/organic.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import bittensor
1111
from compute_horde_core.executor_class import ExecutorClass
1212
from compute_horde_core.output_upload import OutputUpload
13+
from compute_horde_core.streaming import StreamingDetails
1314
from compute_horde_core.volume import Volume
1415
from pydantic import TypeAdapter
1516

@@ -176,7 +177,10 @@ async def notify_volumes_ready(self, msg: V0VolumesReadyRequest) -> None:
176177
"""This method is called when miner sends executor ready message"""
177178

178179
async def notify_execution_done(self, msg: V0ExecutionDoneRequest) -> None:
179-
"""This method is called when miner sends executor ready message"""
180+
"""This method is called when miner sends execution done message"""
181+
182+
async def notify_streaming_readiness(self, msg: V0StreamingJobReadyRequest) -> None:
183+
"""This method is called when miner sends streaming ready message"""
180184

181185
async def handle_manifest_request(self, msg: V0ExecutorManifestRequest) -> None:
182186
try:
@@ -396,9 +400,11 @@ async def send_initial_job_request(
396400
download_time_limit=job_details.job_timing.download_time_limit,
397401
execution_time_limit=job_details.job_timing.execution_time_limit,
398402
upload_time_limit=job_details.job_timing.upload_time_limit,
403+
streaming_start_time_limit=job_details.job_timing.streaming_start_time_limit,
399404
)
400405
if job_details.job_timing is not None
401406
else None,
407+
streaming_details=job_details.streaming_details,
402408
),
403409
)
404410

@@ -428,6 +434,7 @@ class FailureReason(enum.Enum):
428434
EXECUTOR_FAILED = enum.auto()
429435
STREAMING_JOB_READY_TIMED_OUT = enum.auto()
430436
JOB_FAILED = enum.auto()
437+
STREAMING_FAILED = enum.auto()
431438

432439

433440
class OrganicJobError(Exception):
@@ -458,6 +465,7 @@ class TimingDetails:
458465
download_time_limit: int
459466
execution_time_limit: int
460467
upload_time_limit: int
468+
streaming_start_time_limit: int
461469

462470
@property
463471
def total(self):
@@ -473,6 +481,7 @@ def total(self):
473481
volume: Volume | None = None
474482
output: OutputUpload | None = None
475483
artifacts_dir: str | None = None
484+
streaming_details: StreamingDetails | None = None
476485

477486

478487
async def execute_organic_job_on_miner(
@@ -483,7 +492,6 @@ async def execute_organic_job_on_miner(
483492
) -> tuple[str, str, dict[str, str], dict[str, str]]: # stdout, stderr, artifacts, upload_results
484493
"""
485494
Run an organic job. This is a simpler way to use OrganicMinerClient.
486-
487495
:param client: the organic miner client
488496
:param job_details: details specific to the job that needs to be run
489497
:param reservation_time_limit: time for the miner to report reservation success (or decline the job)
@@ -514,7 +522,6 @@ async def execute_organic_job_on_miner(
514522
executor_class=job_details.executor_class,
515523
ttl=reservation_time_limit,
516524
)
517-
518525
await client.send_initial_job_request(job_details, receipt_payload, receipt_signature)
519526
logger.debug("Sent initial job request")
520527
await JobStartedReceipt.from_payload(
@@ -583,7 +590,7 @@ async def execute_organic_job_on_miner(
583590
)
584591
)
585592

586-
## STAGE: volume download
593+
## STAGE: Volume download
587594
try:
588595
if executor_timing:
589596
logger.debug(
@@ -604,6 +611,26 @@ async def execute_organic_job_on_miner(
604611
raise OrganicJobError(FailureReason.VOLUMES_TIMED_OUT) from exc
605612
await client.notify_volumes_ready(volumes_ready_response)
606613

614+
## STAGE: Start streaming
615+
if job_details.streaming_details:
616+
try:
617+
if executor_timing:
618+
logger.debug(
619+
f"Extending deadline by streaming_start_time_limit: +{executor_timing.streaming_start_time_limit}s"
620+
)
621+
deadline.extend_timeout(executor_timing.streaming_start_time_limit)
622+
logger.debug(f"Waiting for streaming (time left: {deadline.time_left():.2f}s)")
623+
streaming_response = await asyncio.wait_for(
624+
client.streaming_job_ready_or_not_future,
625+
timeout=deadline.time_left(),
626+
)
627+
except TimeoutError as exc:
628+
raise OrganicJobError(FailureReason.STREAMING_JOB_READY_TIMED_OUT) from exc
629+
if isinstance(streaming_response, V0StreamingJobNotReadyRequest):
630+
raise OrganicJobError(FailureReason.STREAMING_FAILED, streaming_response)
631+
632+
await client.notify_streaming_readiness(streaming_response)
633+
607634
## STAGE: execution
608635
try:
609636
if executor_timing:

compute_horde/compute_horde/protocol_messages.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from compute_horde_core.executor_class import ExecutorClass
55
from compute_horde_core.output_upload import OutputUpload
6+
from compute_horde_core.streaming import StreamingDetails
67
from compute_horde_core.volume import Volume
78
from pydantic import BaseModel, Field
89

@@ -62,15 +63,12 @@ def total_count(self) -> int:
6263

6364
# validator -> miner.vc -> miner.ec -> executor
6465
class V0InitialJobRequest(BaseModel):
65-
class StreamingDetails(BaseModel):
66-
public_key: str
67-
executor_ip: str | None = None # set by miner before sending to executor
68-
6966
class ExecutorTimingDetails(BaseModel):
7067
allowed_leeway: int
7168
download_time_limit: int
7269
execution_time_limit: int
7370
upload_time_limit: int
71+
streaming_start_time_limit: int
7472

7573
message_type: Literal["V0InitialJobRequest"] = "V0InitialJobRequest"
7674
job_uuid: str

compute_horde/tests/test_certificate.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,16 @@
66

77
import pytest
88
import requests
9-
from cryptography import x509
10-
from cryptography.hazmat.primitives import serialization
11-
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
12-
13-
from compute_horde.certificate import (
9+
from compute_horde_core.certificate import (
1410
generate_certificate,
1511
generate_certificate_at,
1612
save_public_key,
1713
start_nginx,
1814
write_certificate,
1915
)
16+
from cryptography import x509
17+
from cryptography.hazmat.primitives import serialization
18+
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
2019

2120
NGINX_PORT = 8443
2221
NGINX_URI = f"https://localhost:{NGINX_PORT}"

compute_horde/tests/test_job_request.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def test_signed_job_roundtrip(signature_wallet):
2626
download_time_limit=1,
2727
execution_time_limit=1,
2828
upload_time_limit=1,
29+
streaming_start_time_limit=1,
2930
)
3031

3132
signer = BittensorWalletSigner(signature_wallet)

compute_horde/tests/test_run_organic_job.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
from compute_horde_core.executor_class import ExecutorClass
3+
from compute_horde_core.streaming import StreamingDetails
34
from pydantic import TypeAdapter
45

56
from compute_horde.miner_client.organic import (
@@ -16,6 +17,7 @@
1617
V0JobFinishedReceiptRequest,
1718
V0JobFinishedRequest,
1819
V0JobRequest,
20+
V0StreamingJobReadyRequest,
1921
V0VolumesReadyRequest,
2022
ValidatorAuthForMiner,
2123
ValidatorToMinerMessage,
@@ -83,6 +85,58 @@ async def test_run_organic_job__success(keypair):
8385
]
8486

8587

88+
@pytest.mark.asyncio
89+
@pytest.mark.django_db(transaction=True)
90+
async def test_run_organic_job_streaming__success(keypair):
91+
mock_transport = MinerStubTransport(
92+
"mock",
93+
[
94+
V0AcceptJobRequest(job_uuid=JOB_UUID).model_dump_json(),
95+
V0ExecutorReadyRequest(job_uuid=JOB_UUID).model_dump_json(),
96+
V0VolumesReadyRequest(job_uuid=JOB_UUID).model_dump_json(),
97+
V0StreamingJobReadyRequest(
98+
job_uuid=JOB_UUID,
99+
public_key="dummy-cert",
100+
port=12345,
101+
).model_dump_json(),
102+
V0ExecutionDoneRequest(job_uuid=JOB_UUID).model_dump_json(),
103+
V0JobFinishedRequest(
104+
job_uuid=JOB_UUID,
105+
docker_process_stdout="streaming-stdout",
106+
docker_process_stderr="streaming-stderr",
107+
artifacts={},
108+
).model_dump_json(),
109+
],
110+
)
111+
client = OrganicMinerClient(
112+
miner_hotkey="mock",
113+
miner_address="0.0.0.0",
114+
miner_port=1234,
115+
job_uuid=JOB_UUID,
116+
my_keypair=keypair,
117+
transport=mock_transport,
118+
)
119+
job_details = OrganicJobDetails(
120+
job_uuid=JOB_UUID,
121+
executor_class=ExecutorClass.always_on__llm__a6000,
122+
docker_image="mock",
123+
streaming_details=StreamingDetails(public_key="dummy-cert"),
124+
)
125+
await execute_organic_job_on_miner(
126+
client, job_details, reservation_time_limit=2, executor_startup_time_limit=2
127+
)
128+
129+
# Find the initial job request sent by the client
130+
initial_job_request = next(
131+
(m for m in mock_transport.sent_models if isinstance(m, V0InitialJobRequest)), None
132+
)
133+
assert initial_job_request is not None, "Initial job request was not sent"
134+
assert initial_job_request.streaming_details is not None, (
135+
"Streaming details not sent in initial job request"
136+
)
137+
assert initial_job_request.streaming_details.public_key == "dummy-cert"
138+
139+
86140
# TODO:
87141
# - unhappy path
88142
# - connection error

compute_horde/tests/test_signature.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def test_signed_fields__missing_fields():
128128
"download_time_limit": 1,
129129
"execution_time_limit": 1,
130130
"upload_time_limit": 1,
131+
"streaming_start_time_limit": 1,
131132
}
132133
facilitator_signed_fields = SignedFields.from_facilitator_sdk_json(facilitator_request_json)
133134

@@ -143,6 +144,7 @@ def test_signed_fields__missing_fields():
143144
download_time_limit=1,
144145
execution_time_limit=1,
145146
upload_time_limit=1,
147+
streaming_start_time_limit=1,
146148
)
147149
assert v2_job_request.get_signed_fields() == facilitator_signed_fields
148150

@@ -181,6 +183,7 @@ def test_signed_fields__volumes_uploads():
181183
"download_time_limit": 1,
182184
"execution_time_limit": 1,
183185
"upload_time_limit": 1,
186+
"streaming_start_time_limit": 1,
184187
}
185188
facilitator_signed_fields = SignedFields.from_facilitator_sdk_json(facilitator_request_json)
186189

@@ -198,6 +201,7 @@ def test_signed_fields__volumes_uploads():
198201
download_time_limit=1,
199202
execution_time_limit=1,
200203
upload_time_limit=1,
204+
streaming_start_time_limit=1,
201205
)
202206
assert v2_job_request.get_signed_fields() == facilitator_signed_fields
203207

compute_horde/compute_horde/certificate.py renamed to compute_horde_sdk/src/compute_horde_core/certificate.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async def get_docker_container_ip(container_name: str, bridge_network: bool = Fa
3939
raise Exception(f"Failed to get IP of {container_name}")
4040

4141

42-
async def check_endpoint(url, timeout) -> bool:
42+
async def check_endpoint(url: str, timeout: int) -> bool:
4343
"""
4444
Pings endpoint every second until it returns 200 or timeout is reached.
4545
"""
@@ -62,7 +62,7 @@ async def start_nginx(
6262
job_network: str,
6363
container_name: str = "job-nginx",
6464
timeout: int = 10,
65-
):
65+
) -> None:
6666
nginx_conf_file = dir_path / "nginx.conf"
6767
nginx_conf_file.write_text(nginx_conf)
6868

@@ -89,9 +89,7 @@ async def start_nginx(
8989
ip = await get_docker_container_ip(container_name)
9090

9191
# connect to internal network for job communication
92-
process = await asyncio.create_subprocess_exec(
93-
"docker", "network", "connect", job_network, container_name
94-
)
92+
process = await asyncio.create_subprocess_exec("docker", "network", "connect", job_network, container_name)
9593
await process.wait()
9694

9795
# wait for nginx to start
@@ -108,17 +106,13 @@ def generate_certificate(alternative_name: str) -> tuple[Certificate, RSAPrivate
108106
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
109107

110108
builder = x509.CertificateBuilder()
111-
builder = builder.subject_name(
112-
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")])
113-
)
109+
builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")]))
114110
builder = builder.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")]))
115111
builder = builder.not_valid_before(datetime.now(UTC))
116112
builder = builder.not_valid_after(datetime.now(UTC) + timedelta(days=365))
117113
builder = builder.serial_number(x509.random_serial_number())
118114
builder = builder.public_key(private_key.public_key())
119-
builder = builder.add_extension(
120-
x509.BasicConstraints(ca=False, path_length=None), critical=True
121-
)
115+
builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
122116

123117
alt_name: x509.GeneralName
124118
try:
@@ -128,9 +122,7 @@ def generate_certificate(alternative_name: str) -> tuple[Certificate, RSAPrivate
128122
else:
129123
alt_name = x509.IPAddress(_ip)
130124

131-
builder = builder.add_extension(
132-
x509.SubjectAlternativeName([x509.DNSName("localhost"), alt_name]), critical=False
133-
)
125+
builder = builder.add_extension(x509.SubjectAlternativeName([x509.DNSName("localhost"), alt_name]), critical=False)
134126

135127
certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256())
136128

0 commit comments

Comments
 (0)