2222import subprocess
2323import shutil
2424from ConfigParser import SafeConfigParser
25+ from jsonpath_rw import parse
26+ from bson .objectid import ObjectId
2527
2628import cumulus .taskflow
2729from cumulus .tasks .job import download_job_input_folders , submit_job
@@ -64,18 +66,32 @@ class PyFrTaskFlow(cumulus.taskflow.TaskFlow):
6466 }
6567 }
6668 """
67- def start ( self , * args , ** kwargs ):
69+ PYFR_AMI = 'ami-7def1b1d'
6870
69- # Load the cluster
70- model = ModelImporter .model ('cluster' , 'cumulus' )
71+ def start (self , * args , ** kwargs ):
7172 user = getCurrentUser ()
72- cluster = model .load (kwargs ['cluster' ]['_id' ],
73- user = user , level = AccessType .ADMIN )
74- cluster = model .filter (cluster , user , passphrase = False )
75- kwargs ['cluster' ] = cluster
73+ # Load the cluster
74+ cluster_id = parse ('cluster._id' ).find (kwargs )
75+ if cluster_id :
76+ cluster_id = cluster_id [0 ].value
77+ model = ModelImporter .model ('cluster' , 'cumulus' )
78+ cluster = model .load (cluster_id , user = user , level = AccessType .ADMIN )
79+ cluster = model .filter (cluster , user , passphrase = False )
80+ kwargs ['cluster' ] = cluster
81+
82+ profile_id = parse ('cluster.profileId' ).find (kwargs )
83+ if profile_id :
84+ profile_id = profile_id [0 ].value
85+ model = ModelImporter .model ('aws' , 'cumulus' )
86+ profile = model .load (profile_id , user = user , level = AccessType .ADMIN )
87+ kwargs ['profile' ] = profile
88+
89+ kwargs ['next' ] = setup_input .s ()
90+ kwargs ['ami' ] = self .PYFR_AMI
7691
7792 super (PyFrTaskFlow , self ).start (
78- setup_input .s (self ,* args , ** kwargs ))
93+ setup_cluster .s (
94+ self , * args , ** kwargs ))
7995
8096 def terminate (self ):
8197 self .run_task (pyfr_terminate .s ())
@@ -167,7 +183,8 @@ def update_config_file(task, client, *args, **kwargs):
167183 task .logger .info ('%s removed.' % section )
168184
169185 backend_section = 'backend-%s' % kwargs ['backend' ]['type' ]
170- task .logger .info ('Adding backend configuration for %s' )
186+ task .logger .info ('Adding backend configuration for %s'
187+ % kwargs ['backend' ]['type' ] )
171188 # Filter out options with no value
172189 options = {k : v for k , v in kwargs ['backend' ].iteritems () if v }
173190 options .pop ('type' , None )
@@ -191,8 +208,6 @@ def update_config_file(task, client, *args, **kwargs):
191208
192209@cumulus .taskflow .task
193210def setup_input (task , * args , ** kwargs ):
194- task .logger .info ('Input parameters: %s' % kwargs )
195-
196211 input_folder_id = kwargs ['input' ]['folder' ]['id' ]
197212 mesh_file_id = kwargs ['input' ]['meshFile' ]['id' ]
198213 kwargs ['meshFileId' ] = mesh_file_id
@@ -201,6 +216,14 @@ def setup_input(task, *args, **kwargs):
201216 if not number_of_procs :
202217 number_of_procs = kwargs .get ('numberOfNodes' )
203218
219+ if not number_of_procs :
220+ size = parse ('cluster.config.launch.params.node_instance_count' ).find (kwargs )
221+ if size :
222+ number_of_procs = size [0 ].value + 1
223+ else :
224+ raise Exception ('Unable to extract number of nodes in cluster' )
225+
226+
204227 if not number_of_procs :
205228 raise Exception ('Unable to determine number of mpi processes to run.' )
206229
@@ -270,6 +293,24 @@ def setup_input(task, *args, **kwargs):
270293 if os .path .exists (output_dir ):
271294 shutil .rmtree (output_dir )
272295
296+ # If we are running in the cloud determine backend to use
297+ if kwargs ['cluster' ]['type' ] == 'ec2' :
298+ machine_spec = kwargs .get ('machine' )
299+ # If we have GPUs use cuda
300+ if int (machine_spec ['gpu' ]) == 1 :
301+ backend = {
302+ 'type' : 'cuda' ,
303+ 'device-id' : 'round-robin'
304+ }
305+ # Use OpenMP
306+ else :
307+ backend = {
308+ 'type' : 'openmp' ,
309+ 'cblas' : '/usr/lib/openblas-base/libblas.so'
310+ }
311+
312+ kwargs ['backend' ] = backend
313+
273314 update_config_file (task , client , * args , ** kwargs )
274315
275316 ini_file_id = kwargs ['input' ]['iniFile' ]['id' ]
@@ -302,7 +343,10 @@ def create_job(task, *args, **kwargs):
302343 'path' : 'input'
303344 }
304345 ],
305- 'output' : []
346+ 'output' : [],
347+ 'params' : {
348+ 'numberOfSlots' : kwargs ['numberOfProcs' ]
349+ }
306350 }
307351
308352 client = _create_girder_client (
@@ -334,7 +378,8 @@ def submit_pyfr_job(task, cluster, job, *args, **kwargs):
334378 task .logger .info ('Submitting job %s to cluster.' % job ['_id' ])
335379 girder_token = task .taskflow .girder_token
336380
337- job ['params' ] = kwargs
381+ job ['params' ].update (kwargs )
382+
338383 submit_job (cluster , job , log_write_url = None ,
339384 girder_token = girder_token , monitor = False )
340385
@@ -391,7 +436,10 @@ def create_export_job(task, job_name, files, job_dir, mesh_filename):
391436 'name' : job_name ,
392437 'commands' : commands ,
393438 'input' : [],
394- 'output' : []
439+ 'output' : [],
440+ 'params' : {
441+ 'numberOfSlots' : 1
442+ }
395443 }
396444
397445 client = _create_girder_client (
0 commit comments