[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job#4049
[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job#4049arjun4084346 wants to merge 3 commits intoapache:masterfrom
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4049 +/- ##
============================================
+ Coverage 38.79% 41.11% +2.31%
- Complexity 1599 2201 +602
============================================
Files 388 480 +92
Lines 15998 20360 +4362
Branches 1585 2355 +770
============================================
+ Hits 6207 8371 +2164
- Misses 9293 11097 +1804
- Partials 498 892 +394 ☔ View full report in Codecov by Sentry. |
fix merge conflicts
8d5589d to
4bb8ae5
Compare
4bb8ae5 to
43d53e5
Compare
phet
left a comment
There was a problem hiding this comment.
nice improvement. it generally looks good, but let's align on whether SKIPPED is only job-level or also flow-level. once we decide, I'll take one more pass through the ReevaluateDagProcTest to read that more closely
| CANCELLED | ||
|
|
||
| /** | ||
| * Flow or job is skipped |
There was a problem hiding this comment.
how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)
There was a problem hiding this comment.
i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.
| boolean addFlag = true; | ||
| if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) { | ||
| if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME || | ||
| executionStatus == SKIPPED) { |
There was a problem hiding this comment.
I'm unclear here: is "skipping" able to be reversed, so the node can later be ready? (I'm equating getNext to identifying the set of "ready" nodes.)
There was a problem hiding this comment.
no , skipped cannot not be reversed. this diff should not be here, ill change it. i think it might be appropriate in some draft version of this PR, but not anymore
| private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList | ||
| .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, | ||
| ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, | ||
| ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE, |
There was a problem hiding this comment.
why is CANCELLED last, and SKIPPED prior to COMPLETE? what of the similar idea that news of job COMPLETE might arrive after we'd already attempted cancellation or skipping?
There was a problem hiding this comment.
these four are the terminal statuses and once a job reaches here, further GTEs can be ignored.
we also do not expect to see two of them for the same job.
yes, there can be some combinations of events among these four, that may arrive due to race condition, but i think, in that case, it is ok for GaaS to just adhere to any of the ordering. I do not want to change the correct ordering, so I added SKIPPED before other terminal statuses, to basically support the same idea of yours - let it show complete if it is cancelled/skipped earlier.
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
| findDependentJobs(dag, node, dependentJobs); | ||
| for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) { | ||
| Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); | ||
| DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); |
There was a problem hiding this comment.
same comment about hard-coding to this static
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
| dag.setMessage("Flow failed because job " + jobName + " failed"); | ||
| dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED); | ||
| dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode); | ||
| DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); |
There was a problem hiding this comment.
wondering... is this a kind of 'ping-pong'?
a. a job fails, which emits a GTE
b. the KJSM sees the GTE and then creates a DagActionType.REEVALUATE
c. this ReevaluateDagProc emits a SKIPPED GTE for all dependent jobs
d. the KJSM sees those GTEs and creates a DagActionType.REEVALUATE for each of those
I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a DagAction for reevaluating those?
There was a problem hiding this comment.
yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.
| DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); | ||
| break; | ||
| case CANCELLED: | ||
| case SKIPPED: |
There was a problem hiding this comment.
if this is job-level SKIPPED, due to the "ping-pong" I just described?
or is arising from a flow-level execution-status of SKIPPED. if the latter, who sets that? I thought it would be only job-level
There was a problem hiding this comment.
ah, yes this needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.
fix merge conflicts
add tests
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
created SKIPPED execution status
used it for the jobs that cannot be run because it's parent job is cancelled
Tests
updated tests in ReevaluateDagProcTest
Commits