1+ import shlex
12import sys
3+ import uuid
24
35from asgiref .sync import async_to_sync
46from compute_horde .executor_class import DEFAULT_EXECUTOR_CLASS
7+ from compute_horde .fv_protocol .facilitator_requests import V2JobRequest
58from compute_horde .fv_protocol .validator_requests import JobStatusUpdate
9+ from compute_horde_core .executor_class import ExecutorClass
10+ from django .conf import settings
611from django .core .management .base import BaseCommand
7- from django .utils import timezone
812
13+ from compute_horde_validator .validator .allowance .default import allowance
914from compute_horde_validator .validator .models import (
10- AdminJobRequest ,
1115 Miner ,
1216 MinerBlacklist ,
1317 OrganicJob ,
1418)
15- from compute_horde_validator .validator .tasks import run_admin_job_request
19+ from compute_horde_validator .validator .organic_jobs .miner_client import MinerClient
20+ from compute_horde_validator .validator .organic_jobs .miner_driver import (
21+ drive_organic_job ,
22+ )
23+
24+
25+ def get_keypair ():
26+ return settings .BITTENSOR_WALLET ().get_hotkey ()
1627
1728
1829async def notify_job_status_update (msg : JobStatusUpdate ):
@@ -40,44 +51,36 @@ class Command(BaseCommand):
4051 """
4152
4253 def add_arguments (self , parser ):
43- parser .add_argument ("--miner_hotkey" , default = None , type = str , help = "Miner Hotkey" )
44- # TODO: mock miner with address, port, ip_type
45- # parser.add_argument("--miner_address", default=None, type=str, help="Miner IPv4 address")
46- # parser.add_argument("--miner_port", default=None, type=int, help="Miner port")
47- parser .add_argument (
48- "--executor_class" , type = str , help = "Executor class" , default = DEFAULT_EXECUTOR_CLASS
49- )
50- parser .add_argument ("--timeout" , type = int , help = "Timeout value" , required = True )
54+ parser .add_argument ("--miner_hotkey" , default = None , help = "Miner Hotkey" )
55+ parser .add_argument ("--miner_address" , default = None , help = "Miner IP address" )
56+ parser .add_argument ("--miner_ip_version" , default = 4 , help = "Miner IP version" )
57+ parser .add_argument ("--miner_port" , default = None , type = int , help = "Miner port" )
5158 parser .add_argument (
52- "--docker_image" , type = str , help = "docker image for job execution" , required = True
59+ "--executor_class" ,
60+ type = ExecutorClass ,
61+ help = "Executor class" ,
62+ default = DEFAULT_EXECUTOR_CLASS ,
5363 )
64+ parser .add_argument ("--docker_image" , help = "docker image for job execution" , required = True )
5465 parser .add_argument (
55- "--cmd_args" ,
56- type = str ,
57- default = "" ,
58- help = "arguments passed to the script or docker image" ,
66+ "--cmd_args" , default = "" , help = "arguments passed to the script or docker image"
5967 )
68+ parser .add_argument ("--use_gpu" , action = "store_true" , help = "use gpu for job execution" )
69+
6070 parser .add_argument (
61- "--use_gpu" ,
62- action = "store_true" ,
63- help = "use gpu for job execution" ,
71+ "--download_time_limit" , default = 10 , type = int , help = "download time limit in seconds"
6472 )
6573 parser .add_argument (
66- "--input_url" ,
67- type = str ,
68- default = "" ,
69- help = "input url for job execution" ,
74+ "--execution_time_limit" , default = 100 , type = int , help = "execution time limit in seconds"
7075 )
7176 parser .add_argument (
72- "--output_url" ,
73- type = str ,
74- default = "" ,
75- help = "output url for job execution" ,
77+ "--upload_time_limit" , default = 10 , type = int , help = "upload time limit in seconds"
7678 )
7779 parser .add_argument (
78- "--nonzero_if_not_complete" ,
79- action = "store_true" ,
80- help = "if job completes with PENDING or FAILED state, exit with non-zero status code" ,
80+ "--streaming_start_time_limit" ,
81+ default = 10 ,
82+ type = int ,
83+ help = "streaming start time limit in seconds" ,
8184 )
8285
8386 def handle (self , * args , ** options ):
@@ -102,32 +105,64 @@ def handle(self, *args, **options):
102105
103106 print (f"\n Picked miner: { miner } to run the job" )
104107
105- job_request = AdminJobRequest .objects .create (
106- miner = miner ,
107- timeout = options ["timeout" ],
108+ miner_address = miner .address
109+ miner_ip_version = miner .ip_version
110+ miner_port = miner .port
111+ if options ["miner_address" ]:
112+ miner_address = options ["miner_address" ]
113+ miner_ip_version = options ["miner_ip_version" ]
114+ if options ["miner_port" ]:
115+ miner_port = options ["miner_port" ]
116+
117+ job_request = V2JobRequest (
118+ uuid = str (uuid .uuid4 ()),
108119 executor_class = options ["executor_class" ],
109120 docker_image = options ["docker_image" ],
110- args = options ["cmd_args" ],
121+ args = shlex .split (options ["cmd_args" ]),
122+ env = {},
111123 use_gpu = options ["use_gpu" ],
112- input_url = options ["input_url" ],
113- output_url = options ["output_url" ],
114- created_at = timezone .now (),
124+ download_time_limit = options ["download_time_limit" ],
125+ execution_time_limit = options ["execution_time_limit" ],
126+ upload_time_limit = options ["upload_time_limit" ],
127+ streaming_start_time_limit = options ["streaming_start_time_limit" ],
128+ )
129+
130+ job = OrganicJob .objects .create (
131+ job_uuid = str (job_request .uuid ),
132+ miner = miner ,
133+ miner_address = miner_address ,
134+ miner_address_ip_version = miner_ip_version ,
135+ miner_port = miner_port ,
136+ namespace = job_request .job_namespace or job_request .docker_image or None ,
137+ executor_class = job_request .executor_class ,
138+ job_description = "User job from facilitator" ,
139+ block = allowance ().get_current_block (),
115140 )
116141
142+ async def _run_job ():
143+ keypair = get_keypair ()
144+ miner_client = MinerClient (
145+ miner_hotkey = miner .hotkey ,
146+ miner_address = miner_address ,
147+ miner_port = miner_port ,
148+ job_uuid = str (job .job_uuid ),
149+ my_keypair = keypair ,
150+ )
151+ await drive_organic_job (
152+ miner_client ,
153+ job ,
154+ job_request ,
155+ notify_callback = notify_job_status_update ,
156+ )
157+
117158 try :
118- async_to_sync (run_admin_job_request )(job_request .pk , callback = notify_job_status_update )
159+ async_to_sync (_run_job )()
160+ except Exception as e :
161+ print (f"Failed to run job { job .job_uuid } : { e } " )
162+ sys .exit (1 )
119163 except KeyboardInterrupt :
120164 print ("Interrupted by user" )
121165 sys .exit (1 )
122166
123- try :
124- job_request .refresh_from_db ()
125- job = OrganicJob .objects .get (job_uuid = job_request .uuid )
126- print (f"\n Job { job .job_uuid } done processing" )
127- except OrganicJob .DoesNotExist :
128- print (f"\n Job { job_request .uuid } not found" )
129- sys .exit (1 )
130-
131- if options ["nonzero_if_not_complete" ] and job .status != OrganicJob .Status .COMPLETED :
132- print (f"\n Job { job_request .uuid } was unsuccessful, status = { job .status } " )
133- sys .exit (1 )
167+ job .refresh_from_db ()
168+ print (f"\n Job { job .job_uuid } done processing with status: { job .status } " )
0 commit comments