Skip to content

Commit 8c8262e

Browse files
authored
Merge pull request #8427 from atsareg/fix_jobagent
[8.0] JobAgent - do not fail already rescheduled job
2 parents a6d208b + 3f31d94 commit 8c8262e

File tree

3 files changed

+27
-18
lines changed

3 files changed

+27
-18
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
3535
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
3636
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
37-
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
3837
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
3938
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4039
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4140
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
41+
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import RESCHEDULED
4242

4343

4444
class JobAgent(AgentModule):
@@ -729,9 +729,9 @@ def _checkSubmittedJobs(self):
729729
self._rescheduleFailedJob(jobID, result["Message"])
730730
self.hostFailureCount += 1
731731

732-
# The payload failed (if result["Value"] is not 0)
733-
elif result["Value"]:
734-
# In order to avoid overriding perfectly valid states, the status is updated iff the job was running
732+
# The payload failed (if result["Value"] is not 0 and the job was not rescheduled)
733+
elif result["Value"] and result["Value"] != RESCHEDULED:
734+
# In order to avoid overriding perfectly valid states, the status is updated if the job was running
735735
res = JobMonitoringClient().getJobsStatus(jobID)
736736
if not res["OK"]:
737737
return res

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@
6060

6161
EXECUTION_RESULT = {}
6262

63+
SUBMISSION_FAILED = -1
64+
SUBMISSION_REPORT_FAILED = -2
65+
JOBWRAPPER_EXCEPTION = -3
66+
INITIALIZATION_FAILED = 1
67+
PAYLOAD_FAILED = 2
68+
FINALIZATION_FAILED = 3
69+
RESCHEDULED = 4
70+
6371

6472
class JobWrapper:
6573
"""The only user of the JobWrapper is the JobWrapperTemplate"""

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from DIRAC.Core.Utilities import DErrno
3232

3333
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob
34+
from DIRAC.WorkloadManagementSystem.JobWrapper import JobWrapper as JW
3435
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
3536
from DIRAC.WorkloadManagementSystem.Client import JobStatus
3637
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
@@ -101,7 +102,7 @@ def execute(arguments):
101102
else:
102103
gLogger.exception("JobWrapperTemplate could not create working directory")
103104
rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory")
104-
return 1
105+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
105106

106107
gJobReport = JobReport(jobID, "JobWrapper")
107108

@@ -114,7 +115,7 @@ def execute(arguments):
114115
jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport
115116
)
116117
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION)
117-
return 1
118+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
118119

119120
if "InputSandbox" in arguments["Job"]:
120121
gJobReport.commit()
@@ -129,14 +130,14 @@ def execute(arguments):
129130
jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport
130131
)
131132
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
132-
return 1
133+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
133134
except Exception as exc: # pylint: disable=broad-except
134135
gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc)
135136
rescheduleResult = rescheduleFailedJob(
136137
jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport
137138
)
138139
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
139-
return 1
140+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
140141
else:
141142
gLogger.verbose("Job has no InputSandbox requirement")
142143

@@ -155,14 +156,14 @@ def execute(arguments):
155156
jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport
156157
)
157158
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION)
158-
return 1
159+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
159160
except Exception as exc: # pylint: disable=broad-except
160161
gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc)
161162
rescheduleResult = rescheduleFailedJob(
162163
jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport
163164
)
164165
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION)
165-
return 1
166+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
166167
else:
167168
gLogger.verbose("Job has a null InputData requirement:")
168169
gLogger.verbose(arguments)
@@ -185,15 +186,15 @@ def execute(arguments):
185186
jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport
186187
)
187188
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
188-
return 1
189+
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.PAYLOAD_FAILED
189190
gLogger.exception("Job failed in execution phase")
190191
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
191192
gJobReport.setJobStatus(
192193
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
193194
)
194195
job.sendFailoverRequest()
195196
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
196-
return 1
197+
return JW.PAYLOAD_FAILED
197198
except Exception as exc: # pylint: disable=broad-except
198199
gLogger.exception("Job raised exception during execution phase", lException=exc)
199200
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
@@ -202,7 +203,7 @@ def execute(arguments):
202203
)
203204
job.sendFailoverRequest()
204205
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
205-
return 1
206+
return JW.PAYLOAD_FAILED
206207

207208
if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]:
208209
try:
@@ -219,7 +220,7 @@ def execute(arguments):
219220
job.sendFailoverRequest()
220221
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS)
221222

222-
return 2
223+
return JW.FINALIZATION_FAILED
223224
except Exception as exc: # pylint: disable=broad-except
224225
gLogger.exception("JobWrapper raised exception while processing output files", lException=exc)
225226
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
@@ -228,7 +229,7 @@ def execute(arguments):
228229
)
229230
job.sendFailoverRequest()
230231
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS)
231-
return 2
232+
return JW.FINALIZATION_FAILED
232233
else:
233234
gLogger.verbose("Job has no OutputData or OutputSandbox requirement")
234235

@@ -243,7 +244,7 @@ def execute(arguments):
243244
##########################################################
244245

245246

246-
ret = -3
247+
ret = JW.JOBWRAPPER_EXCEPTION
247248
try:
248249
jsonFileName = os.path.realpath(__file__) + ".json"
249250
with open(jsonFileName) as f:
@@ -259,9 +260,9 @@ def execute(arguments):
259260
gLogger.exception("JobWrapperTemplate exception", lException=exc)
260261
try:
261262
gJobReport.commit()
262-
ret = -1
263+
ret = JW.SUBMISSION_FAILED
263264
except Exception as exc: # pylint: disable=broad-except
264265
gLogger.exception("Could not commit the job report", lException=exc)
265-
ret = -2
266+
ret = JW.SUBMISSION_REPORT_FAILED
266267

267268
sys.exit(ret)

0 commit comments

Comments
 (0)