Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.gobblin.cluster.event;

import lombok.Getter;
import org.apache.gobblin.runtime.JobState;


/**
* The `JobFailureEvent` class represents an event that is triggered when a job fails.
* It contains information about the job state and a summary of the issues that caused the failure.
*/
public class JobFailureEvent {
@Getter
private final JobState jobState;
@Getter
private final String issuesSummary;
public JobFailureEvent(JobState jobState, String issuesSummary) {
this.jobState = jobState;
this.issuesSummary = issuesSummary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;

import org.apache.commons.text.TextStringBuilder;
import org.apache.gobblin.cluster.event.JobFailureEvent;
import org.apache.gobblin.runtime.JobState;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand All @@ -45,6 +48,7 @@
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.troubleshooter.Issue;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
Expand Down Expand Up @@ -107,13 +111,38 @@ protected Config applyJobLauncherOverrides(Config config) {
return configOverrides.withFallback(config);
}

private String getIssuesSummary() {
TextStringBuilder sb = new TextStringBuilder();
try {
List<Issue> issues = this.getIssueRepository().getAll();
sb.appendln("");
sb.appendln("vvvvv============= Issues (summary) =============vvvvv");

for (int i = 0; i < issues.size(); i++) {
Issue issue = issues.get(i);

sb.appendln("%s) %s %s %s | source: %s", i + 1, issue.getSeverity().toString(), issue.getCode(),
issue.getSummary(), issue.getSourceClass());
}
sb.append("^^^^^=============================================^^^^^");
sb.toString();
}
catch(Exception e) {
log.warn("Failed to get issue summary", e);
}
return sb.toString();
}

@Override
protected void handleLaunchFinalization() {
// NOTE: This code only makes sense when there is 1 source / workflow being launched per application for Temporal. This is a stop-gap
// for achieving batch job behavior. Given the current constraints of yarn applications requiring a static proxy user
// during application creation, it is not possible to have multiple workflows running in the same application.
// and so it makes sense to just kill the job after this is completed
log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId());
JobState jobState = this.jobContext.getJobState();
String issuesSummary = this.getIssuesSummary();
eventBus.post(new JobFailureEvent(jobState, issuesSummary));
eventBus.post(new ClusterManagerShutdownRequest());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import java.util.stream.IntStream;

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.cluster.event.JobFailureEvent;
import org.apache.gobblin.runtime.EventMetadataUtils;
import org.apache.gobblin.runtime.JobState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -198,6 +201,13 @@ class YarnService extends AbstractIdleService {
private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID);
private final ConcurrentMap<Long, WorkerProfile> workerProfileByAllocationRequestId = new ConcurrentHashMap<>();

@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private JobState jobState;
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private String jobIssuesSummary;

public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
this.applicationName = applicationName;
Expand Down Expand Up @@ -304,6 +314,13 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea
}
}

@SuppressWarnings("unused")
@Subscribe
public void handleJobFailure(JobFailureEvent jobFailureEvent) {
this.jobState = jobFailureEvent.getJobState();
this.jobIssuesSummary = jobFailureEvent.getIssuesSummary();
}

@Override
protected synchronized void startUp() throws Exception {
LOGGER.info("Starting the TemporalYarnService");
Expand Down Expand Up @@ -353,7 +370,11 @@ protected void shutDown() throws IOException {
}
}

this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null);
if (this.jobState != null && !this.jobState.getState().isSuccess()) {
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobIssuesSummary, null);
} else {
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null);
}
} catch (IOException | YarnException e) {
LOGGER.error("Failed to unregister the ApplicationMaster", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.net.URL;

import org.apache.gobblin.cluster.event.JobFailureEvent;
import org.apache.gobblin.runtime.JobState;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Resource;
Expand Down Expand Up @@ -123,4 +125,25 @@ public void testBuildContainerCommand() throws Exception {
String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag");
Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));
}

@Test
public void testHandleJobFailureEvent() throws Exception {
YarnService yarnService = new YarnService(
this.defaultConfigs,
"testApplicationName",
"testApplicationId",
yarnConfiguration,
mockFileSystem,
eventBus
);

yarnService.startUp();

eventBus.post(new JobFailureEvent(new JobState("name","id"), "summary"));

Thread.sleep(1000);
Assert.assertEquals(yarnService.getJobState().getJobName(),"name");
Assert.assertEquals(yarnService.getJobState().getJobId(),"id");
Assert.assertEquals(yarnService.getJobIssuesSummary(),"summary");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ public class GobblinYarnAppLauncher {
// This flag tells if the Yarn application has already completed. This is used to
// tell if it is necessary to send a shutdown message to the ApplicationMaster.
private volatile boolean applicationCompleted = false;
private final Object applicationDone = new Object();
private volatile boolean applicationFailed = false;

private volatile boolean stopped = false;

Expand Down Expand Up @@ -380,6 +382,19 @@ public void launch() throws IOException, YarnException, InterruptedException {
}, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES);

addServices();

synchronized (this.applicationDone) {
while (!this.applicationCompleted) {
try {
this.applicationDone.wait();
Comment on lines +389 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be simpler and cleaner to use CountDownLatch instead of explicit synchronization with synchronized, wait(), and notify()

if (this.applicationFailed) {
throw new RuntimeException("Gobblin Yarn application failed");
}
} catch (InterruptedException ie) {
LOGGER.error("Interrupted while waiting for the Gobblin Yarn application to finish", ie);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception?

}
}
}
}

public boolean isApplicationRunning() {
Expand Down Expand Up @@ -453,7 +468,6 @@ public synchronized void stop() throws IOException, TimeoutException {
this.closer.close();
}
}

this.stopped = true;
}

Expand Down Expand Up @@ -482,9 +496,15 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap
LOGGER.info("Gobblin Yarn application finished with final status: " +
applicationReport.getFinalApplicationStatus().toString());
if (applicationReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED) {
applicationFailed = true;
LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics());
}

synchronized (this.applicationDone) {
this.applicationDone.notify();
}


try {
GobblinYarnAppLauncher.this.stop();
} catch (IOException ioe) {
Expand Down
Loading