From b17f3dd80d15c01400e52ab68e77bc11977e061a Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 11:22:47 +0530 Subject: [PATCH 01/10] Fail Azkaban job on GaaS Job Failure --- .../cluster/event/JobFailureEvent.java | 19 ++++++++++++ .../GobblinTemporalJobLauncher.java | 29 +++++++++++++++++++ .../gobblin/temporal/yarn/YarnService.java | 19 +++++++++++- .../gobblin/yarn/GobblinYarnAppLauncher.java | 22 +++++++++++++- 4 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java new file mode 100644 index 00000000000..1002400d17c --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java @@ -0,0 +1,19 @@ +package org.apache.gobblin.cluster.event; + +import lombok.Getter; +import org.apache.gobblin.runtime.JobState; + + +/* + + */ +public class JobFailureEvent { + @Getter + private final JobState jobState; + @Getter + private final String issuesSummary; + public JobFailureEvent(JobState jobState, String issuesSummary) { + this.jobState = jobState; + this.issuesSummary = issuesSummary; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 2d17fe20a30..a574c4849a8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -36,6 +36,9 @@ import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; +import org.apache.commons.text.TextStringBuilder; +import org.apache.gobblin.cluster.event.JobFailureEvent; +import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -45,6 +48,7 @@ import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.runtime.troubleshooter.Issue; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; @@ -107,6 +111,28 @@ protected Config applyJobLauncherOverrides(Config config) { return configOverrides.withFallback(config); } + private String getIssuesSummary() { + TextStringBuilder sb = new TextStringBuilder(); + try { + List issues = this.getIssueRepository().getAll(); + sb.appendln(""); + sb.appendln("vvvvv============= Issues (summary) =============vvvvv"); + + for (int i = 0; i < issues.size(); i++) { + Issue issue = issues.get(i); + + sb.appendln("%s) %s %s %s | source: %s", i + 1, issue.getSeverity().toString(), issue.getCode(), + issue.getSummary(), issue.getSourceClass()); + } + sb.append("^^^^^=============================================^^^^^"); + sb.toString(); + } + catch(Exception e) { + log.warn("Failed to get issue summary", e); + } + return sb.toString(); + } + @Override protected void handleLaunchFinalization() { // NOTE: This code only makes sense when there is 1 source / workflow being launched per application for Temporal. This is a stop-gap @@ -114,6 +140,9 @@ protected void handleLaunchFinalization() { // during application creation, it is not possible to have multiple workflows running in the same application. // and so it makes sense to just kill the job after this is completed log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); + JobState jobState = this.jobContext.getJobState(); + String issuesSummary = this.getIssuesSummary(); + eventBus.post(new JobFailureEvent(jobState, issuesSummary)); eventBus.post(new ClusterManagerShutdownRequest()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index ec4da215a63..f05f1d4536d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -37,6 +37,9 @@ import java.util.stream.IntStream; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.cluster.event.JobFailureEvent; +import org.apache.gobblin.runtime.EventMetadataUtils; +import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -198,6 +201,9 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); + private JobState jobState; + private String jobIssuesSummary; + public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { this.applicationName = applicationName; @@ -304,6 +310,13 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea } } + @SuppressWarnings("unused") + @Subscribe + public void handleJobFailure(JobFailureEvent jobFailureEvent) { + this.jobState = jobFailureEvent.getJobState(); + this.jobIssuesSummary = jobFailureEvent.getIssuesSummary(); + } + @Override protected synchronized void startUp() throws Exception { LOGGER.info("Starting the TemporalYarnService"); @@ -353,7 +366,11 @@ protected void shutDown() throws IOException { } } - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + if (this.jobState != null && !this.jobState.getState().isSuccess()) { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobIssuesSummary, null); + } else { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + } } catch (IOException | YarnException e) { LOGGER.error("Failed to unregister the ApplicationMaster", e); } finally { diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index d2f2180fff9..d759b3be92a 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -230,6 +230,8 @@ public class GobblinYarnAppLauncher { // This flag tells if the Yarn application has already completed. This is used to // tell if it is necessary to send a shutdown message to the ApplicationMaster. private volatile boolean applicationCompleted = false; + private final Object applicationDone = new Object(); + private volatile boolean applicationFailed = false; private volatile boolean stopped = false; @@ -380,6 +382,19 @@ public void launch() throws IOException, YarnException, InterruptedException { }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES); addServices(); + + synchronized (this.applicationDone) { + while (!this.applicationCompleted) { + try { + this.applicationDone.wait(); + if (this.applicationFailed) { + throw new RuntimeException("Gobblin Yarn application failed"); + } + } catch (InterruptedException ie) { + LOGGER.error("Interrupted while waiting for the Gobblin Yarn application to finish", ie); + } + } + } } public boolean isApplicationRunning() { @@ -453,7 +468,6 @@ public synchronized void stop() throws IOException, TimeoutException { this.closer.close(); } } - this.stopped = true; } @@ -482,9 +496,15 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap LOGGER.info("Gobblin Yarn application finished with final status: " + applicationReport.getFinalApplicationStatus().toString()); if (applicationReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED) { + applicationFailed = true; LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics()); } + synchronized (this.applicationDone) { + this.applicationDone.notify(); + } + + try { GobblinYarnAppLauncher.this.stop(); } catch (IOException ioe) { From 3bac719691661d43f820bfe906f0abae1ddb9a93 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 12:11:04 +0530 Subject: [PATCH 02/10] update test for YarnService --- .../gobblin/temporal/yarn/YarnService.java | 4 ++++ .../temporal/yarn/YarnServiceTest.java | 23 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index f05f1d4536d..39e1d52e55c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -201,7 +201,11 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) private JobState jobState; + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) private String jobIssuesSummary; public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 3c81316b85c..6a67099799f 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.URL; +import org.apache.gobblin.cluster.event.JobFailureEvent; +import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Resource; @@ -123,4 +125,25 @@ public void testBuildContainerCommand() throws Exception { String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); } + + @Test + public void testHandleJobFailureEvent() throws Exception { + YarnService yarnService = new YarnService( + this.defaultConfigs, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + yarnService.startUp(); + + eventBus.post(new JobFailureEvent(new JobState("name","id"), "summary")); + + Thread.sleep(1000); + Assert.assertEquals(yarnService.getJobState().getJobName(),"name"); + Assert.assertEquals(yarnService.getJobState().getJobId(),"id"); + Assert.assertEquals(yarnService.getJobIssuesSummary(),"summary"); + } } From ba8838866630efde2640b1dbf80d8a06e34e3195 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 12:18:10 +0530 Subject: [PATCH 03/10] Add docstring for JobFailureEvent --- .../org/apache/gobblin/cluster/event/JobFailureEvent.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java index 1002400d17c..9e58e6c3ed9 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java @@ -4,8 +4,9 @@ import org.apache.gobblin.runtime.JobState; -/* - +/** + * The `JobFailureEvent` class represents an event that is triggered when a job fails. + * It contains information about the job state and a summary of the issues that caused the failure. */ public class JobFailureEvent { @Getter From 4fd615de77f09e26cbf15b7b0509397a2775a4e8 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 13:00:05 +0530 Subject: [PATCH 04/10] Checkstyle errors and redundant line remove --- .../gobblin/cluster/event/JobFailureEvent.java | 18 ++++++++++++++++++ .../GobblinTemporalJobLauncher.java | 1 - 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java index 9e58e6c3ed9..5e654fc09db 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.gobblin.cluster.event; import lombok.Getter; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index a574c4849a8..ce73186381c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -125,7 +125,6 @@ private String getIssuesSummary() { issue.getSummary(), issue.getSourceClass()); } sb.append("^^^^^=============================================^^^^^"); - sb.toString(); } catch(Exception e) { log.warn("Failed to get issue summary", e); From a449483d778961e33a825f0e0b85888ac31daeb7 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 15:49:29 +0530 Subject: [PATCH 05/10] review comments --- ...FailureEvent.java => JobSummaryEvent.java} | 10 ++++------ .../GobblinTemporalJobLauncher.java | 7 +++++-- .../gobblin/temporal/yarn/YarnService.java | 19 ++++++++----------- .../temporal/yarn/YarnServiceTest.java | 7 ++++--- 4 files changed, 21 insertions(+), 22 deletions(-) rename gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/{JobFailureEvent.java => JobSummaryEvent.java} (81%) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java similarity index 81% rename from gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java rename to gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java index 5e654fc09db..294fb1bf4ff 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java @@ -18,21 +18,19 @@ package org.apache.gobblin.cluster.event; +import lombok.AllArgsConstructor; import lombok.Getter; import org.apache.gobblin.runtime.JobState; /** - * The `JobFailureEvent` class represents an event that is triggered when a job fails. + * The `JobSummaryEvent` class represents an event that is triggered when a job completes. * It contains information about the job state and a summary of the issues that caused the failure. */ -public class JobFailureEvent { +@AllArgsConstructor +public class JobSummaryEvent { @Getter private final JobState jobState; @Getter private final String issuesSummary; - public JobFailureEvent(JobState jobState, String issuesSummary) { - this.jobState = jobState; - this.issuesSummary = issuesSummary; - } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index ce73186381c..61109e98073 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -37,7 +37,7 @@ import io.temporal.workflow.Workflow; import org.apache.commons.text.TextStringBuilder; -import org.apache.gobblin.cluster.event.JobFailureEvent; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -115,6 +115,9 @@ private String getIssuesSummary() { TextStringBuilder sb = new TextStringBuilder(); try { List issues = this.getIssueRepository().getAll(); + if (issues.size() == 0) { + return ""; + } sb.appendln(""); sb.appendln("vvvvv============= Issues (summary) =============vvvvv"); @@ -141,7 +144,7 @@ protected void handleLaunchFinalization() { log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); JobState jobState = this.jobContext.getJobState(); String issuesSummary = this.getIssuesSummary(); - eventBus.post(new JobFailureEvent(jobState, issuesSummary)); + eventBus.post(new JobSummaryEvent(jobState, issuesSummary)); eventBus.post(new ClusterManagerShutdownRequest()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 39e1d52e55c..924192bef39 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -37,8 +37,7 @@ import java.util.stream.IntStream; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.cluster.event.JobFailureEvent; -import org.apache.gobblin.runtime.EventMetadataUtils; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -201,12 +200,10 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private JobState jobState; - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private String jobIssuesSummary; + @Getter + protected JobState jobState; + @Getter + protected String jobIssuesSummary; public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -316,9 +313,9 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea @SuppressWarnings("unused") @Subscribe - public void handleJobFailure(JobFailureEvent jobFailureEvent) { - this.jobState = jobFailureEvent.getJobState(); - this.jobIssuesSummary = jobFailureEvent.getIssuesSummary(); + public void handleJobFailure(JobSummaryEvent jobSummaryEvent) { + this.jobState = jobSummaryEvent.getJobState(); + this.jobIssuesSummary = jobSummaryEvent.getIssuesSummary(); } @Override diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 6a67099799f..f527682ba41 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.net.URL; -import org.apache.gobblin.cluster.event.JobFailureEvent; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -139,9 +139,10 @@ public void testHandleJobFailureEvent() throws Exception { yarnService.startUp(); - eventBus.post(new JobFailureEvent(new JobState("name","id"), "summary")); + eventBus.post(new JobSummaryEvent(new JobState("name","id"), "summary")); - Thread.sleep(1000); + // Waiting for the event to be handled + Thread.sleep(100); Assert.assertEquals(yarnService.getJobState().getJobName(),"name"); Assert.assertEquals(yarnService.getJobState().getJobId(),"id"); Assert.assertEquals(yarnService.getJobIssuesSummary(),"summary"); From d898960bf7b2278f5f828ec5c7c55631fb70f105 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 6 Feb 2025 15:56:39 +0530 Subject: [PATCH 06/10] review changes --- .../java/org/apache/gobblin/temporal/yarn/YarnService.java | 4 ++-- .../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 924192bef39..553aff2b92a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -368,9 +368,9 @@ protected void shutDown() throws IOException { } if (this.jobState != null && !this.jobState.getState().isSuccess()) { - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobIssuesSummary, null); + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.getJobIssuesSummary(), null); } else { - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, StringUtils.defaultString(this.getJobIssuesSummary()), null); } } catch (IOException | YarnException e) { LOGGER.error("Failed to unregister the ApplicationMaster", e); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index d759b3be92a..f74f62c6551 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -383,6 +383,9 @@ public void launch() throws IOException, YarnException, InterruptedException { addServices(); + // The YarnClient and all the services are started asynchronously. + // This will block until the application is completed and throws an exception to fail the Azkaban Job in case the + // underlying Yarn Application reports a job failure. synchronized (this.applicationDone) { while (!this.applicationCompleted) { try { @@ -497,7 +500,9 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap applicationReport.getFinalApplicationStatus().toString()); if (applicationReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED) { applicationFailed = true; - LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics()); + LOGGER.error("Gobblin Yarn application failed because of the following issues: " + applicationReport.getDiagnostics()); + } else if (StringUtils.isNotBlank(applicationReport.getDiagnostics())) { + LOGGER.error("Gobblin Yarn application succeeded but has some warning issues: " + applicationReport.getDiagnostics()); } synchronized (this.applicationDone) { From 17d12ae48ad0799c4388db50aa48e49293423698 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Fri, 7 Feb 2025 16:27:26 +0530 Subject: [PATCH 07/10] import ordeR --- .../temporal/joblauncher/GobblinTemporalJobLauncher.java | 4 ++-- .../java/org/apache/gobblin/temporal/yarn/YarnService.java | 4 ++-- .../org/apache/gobblin/temporal/yarn/YarnServiceTest.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 61109e98073..a9f4e7e008b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -37,8 +37,6 @@ import io.temporal.workflow.Workflow; import org.apache.commons.text.TextStringBuilder; -import org.apache.gobblin.cluster.event.JobSummaryEvent; -import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -46,8 +44,10 @@ import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.troubleshooter.Issue; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 553aff2b92a..a62319da89e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -37,8 +37,6 @@ import java.util.stream.IntStream; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.cluster.event.JobSummaryEvent; -import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -95,6 +93,8 @@ import org.apache.gobblin.cluster.GobblinClusterMetricTagNames; import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.cluster.event.JobSummaryEvent; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricReporterException; import org.apache.gobblin.metrics.MultiReporterException; diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index f527682ba41..c8d91065749 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.net.URL; -import org.apache.gobblin.cluster.event.JobSummaryEvent; -import org.apache.gobblin.runtime.JobState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Resource; @@ -43,6 +41,8 @@ import com.typesafe.config.ConfigValueFactory; import com.google.common.eventbus.EventBus; +import org.apache.gobblin.cluster.event.JobSummaryEvent; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; import static org.mockito.Mockito.*; From 849991fe9c18c8a04806741359013307b56cba42 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 13 Feb 2025 10:16:25 +0530 Subject: [PATCH 08/10] replace fields with object --- .../apache/gobblin/temporal/yarn/YarnService.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index a62319da89e..7a5f1e518e3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -201,9 +201,7 @@ class YarnService extends AbstractIdleService { private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); @Getter - protected JobState jobState; - @Getter - protected String jobIssuesSummary; + protected JobSummaryEvent jobSummaryEvent; public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -314,8 +312,7 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea @SuppressWarnings("unused") @Subscribe public void handleJobFailure(JobSummaryEvent jobSummaryEvent) { - this.jobState = jobSummaryEvent.getJobState(); - this.jobIssuesSummary = jobSummaryEvent.getIssuesSummary(); + this.jobSummaryEvent = jobSummaryEvent; } @Override @@ -367,10 +364,10 @@ protected void shutDown() throws IOException { } } - if (this.jobState != null && !this.jobState.getState().isSuccess()) { - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.getJobIssuesSummary(), null); + if (this.jobSummaryEvent.getJobState() != null && !this.jobSummaryEvent.getJobState().getState().isSuccess()) { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobSummaryEvent.getIssuesSummary(), null); } else { - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, StringUtils.defaultString(this.getJobIssuesSummary()), null); + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, StringUtils.defaultString(this.jobSummaryEvent.getIssuesSummary()), null); } } catch (IOException | YarnException e) { LOGGER.error("Failed to unregister the ApplicationMaster", e); From bb1a8f34885baa7671f968f68fbeccacd028de76 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 13 Feb 2025 10:52:59 +0530 Subject: [PATCH 09/10] update test --- .../org/apache/gobblin/temporal/yarn/YarnServiceTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index c8d91065749..fed2577d015 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -143,8 +143,8 @@ public void testHandleJobFailureEvent() throws Exception { // Waiting for the event to be handled Thread.sleep(100); - Assert.assertEquals(yarnService.getJobState().getJobName(),"name"); - Assert.assertEquals(yarnService.getJobState().getJobId(),"id"); - Assert.assertEquals(yarnService.getJobIssuesSummary(),"summary"); + Assert.assertEquals(yarnService.jobSummaryEvent.getJobState().getJobName(),"name"); + Assert.assertEquals(yarnService.jobSummaryEvent.getJobState().getJobId(),"id"); + Assert.assertEquals(yarnService.jobSummaryEvent.getIssuesSummary(),"summary"); } } From d23ef0771ec5eb24bc4af6bf8fadb8f333c0cb89 Mon Sep 17 00:00:00 2001 From: Swapnil Date: Thu, 13 Feb 2025 17:13:52 +0530 Subject: [PATCH 10/10] remove unused import --- .../main/java/org/apache/gobblin/temporal/yarn/YarnService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 7a5f1e518e3..c643b5cb94d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -94,7 +94,6 @@ import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; import org.apache.gobblin.cluster.event.JobSummaryEvent; -import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricReporterException; import org.apache.gobblin.metrics.MultiReporterException;