Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<okhttp.version>4.12.0</okhttp.version>
<jacoco.version>0.8.11</jacoco.version>
<opentelemetry.version>1.29.0</opentelemetry.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -506,42 +507,42 @@
<dependency>
<groupId>io.opentelemetry.javaagent</groupId>
<artifactId>opentelemetry-javaagent</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.29.0-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>1.30.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
<version>1.29.0</version>
<version>${opentelemetry.version}</version>
</dependency>
<!--telemetry end-->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,11 @@ public DAGOperations dagOperations(
@Autowired @Qualifier("dagCallback") Callback<DAGCallbackInfo> dagCallback,
@Autowired @Qualifier("timeCheckRunner") TimeCheckRunner timeCheckRunner,
@Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor,
@Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) {
@Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler,
@Autowired @Qualifier("tracerHelper") TracerHelper tracerHelper) {
log.info("begin to init default DAGOperations bean");
DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner,
timeCheckRunner, dagTraversal, dagCallback, dagResultHandler);
timeCheckRunner, dagTraversal, dagCallback, dagResultHandler, tracerHelper);
dagTraversal.setDagOperations(dagOperations);
timeCheckRunner.setDagOperations(dagOperations);
return dagOperations;
Expand Down
4 changes: 4 additions & 0 deletions rill-flow-dag/olympicene-traversal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<dependency>
<groupId>com.weibo</groupId>
<artifactId>rill-flow-common</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.weibo.rill.flow.common.constant.ReservedConstant;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.*;
Expand All @@ -37,9 +38,13 @@
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -61,6 +66,7 @@ public class DAGOperations {
private final DAGTraversal dagTraversal;
private final Callback<DAGCallbackInfo> callback;
private final DAGResultHandler dagResultHandler;
private final TracerHelper tracerHelper;


public static final BiConsumer<Runnable, Integer> OPERATE_WITH_RETRY = (operation, retryTimes) -> {
Expand All @@ -79,14 +85,15 @@ public class DAGOperations {

public DAGOperations(ExecutorService runnerExecutor, Map<String, TaskRunner> taskRunners, DAGRunner dagRunner,
TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback<DAGCallbackInfo> callback,
DAGResultHandler dagResultHandler) {
DAGResultHandler dagResultHandler, TracerHelper tracerHelper) {
this.runnerExecutor = runnerExecutor;
this.taskRunners = taskRunners;
this.dagRunner = dagRunner;
this.timeCheckRunner = timeCheckRunner;
this.dagTraversal = dagTraversal;
this.callback = callback;
this.dagResultHandler = dagResultHandler;
this.tracerHelper = tracerHelper;
}

public void runTasks(String executionId, Collection<Pair<TaskInfo, Map<String, Object>>> taskInfoToContexts) {
Expand All @@ -109,44 +116,70 @@ private void runTask(String executionId, TaskInfo taskInfo, Map<String, Object>
params.put("taskInfo", taskInfo);
params.put("context", context);

TaskRunner runner = selectRunner(taskInfo);
Supplier<ExecutionResult> basicActions = () -> runner.run(executionId, taskInfo, context);

Supplier<ExecutionResult> supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
ExecutionResult executionResult = supplier.get();

/*
任务执行后结果类型
1. 任务执行完成 如 return/pass
1.1 任务执行完成 需要寻找下一个能执行的任务
1.2 任务执行中 需要外部系统调finish触发下一个任务执行
1.3 任务需要重试
2. 流程控制类任务 foreach/choice
2.1 触发能够执行的子任务
*/
// 对应1.1
if (isTaskCompleted(executionResult)) {
dagTraversal.submitTraversal(executionId, taskInfo.getName());
invokeTaskCallback(executionId, taskInfo, context);
}
// 对应1.2
if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
.ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
}
// 对应1.3
if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
}
// 对应2.1
if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
executionResult.getSubTaskInfosAndContext()
.forEach(subTaskInfosAndContext -> {
dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
safeSleep(10);
});
// 获取当前的 Context,这应该包含了从异步任务传递过来的 context
Context parentContext = Context.current();

Span span = tracerHelper.getTracer().spanBuilder("runTask " + taskInfo.getName())
.setAttribute("execution.id", executionId)
.setAttribute("task.name", taskInfo.getName())
.setAttribute("task.category", taskInfo.getTask().getCategory())
.setParent(parentContext) // 显式设置父 context
.startSpan();

TaskStatus executionResultStatus = null;
try (Scope scope = Context.current().with(span).makeCurrent()) {
TaskRunner runner = selectRunner(taskInfo);
Supplier<ExecutionResult> basicActions = () -> runner.run(executionId, taskInfo, context);

Supplier<ExecutionResult> supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
ExecutionResult executionResult = supplier.get();

/*
任务执行后结果类型
1. 任务执行完成 如 return/pass
1.1 任务执行完成 需要寻找下一个能执行的任务
1.2 任务执行中 需要外部系统调finish触发下一个任务执行
1.3 任务需要重试
2. 流程控制类任务 foreach/choice
2.1 触发能够执行的子任务
*/
// 对应1.1
if (isTaskCompleted(executionResult)) {
Context currentContext = Context.current();
runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
try (Scope ignored = currentContext.makeCurrent()) {
dagTraversal.submitTraversal(executionId, taskInfo.getName());
invokeTaskCallback(executionId, taskInfo, context);
}
}));
}
executionResultStatus = executionResult.getTaskStatus();
// 对应1.2
if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
.ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
tracerHelper.saveContext(executionId, taskInfo.getName(), parentContext, span);
return;
}
// 对应1.3
if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
}
// 对应2.1
if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
executionResult.getSubTaskInfosAndContext()
.forEach(subTaskInfosAndContext -> {
dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
safeSleep(10);
});
}
} finally {
if (executionResultStatus != null && executionResultStatus.isCompleted()) {
span.setAttribute("status", executionResultStatus.toString());
span.end();
}
}
}

Expand Down Expand Up @@ -204,24 +237,51 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n
Supplier<ExecutionResult> supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS);
ExecutionResult executionResult = supplier.get();

if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
}
if (isTaskCompleted(executionResult)) {
timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
}
if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
}
// 尝试恢复之前的 span
Span span = tracerHelper.loadSpan(executionId, executionResult.getTaskInfo().getName());
Context currentContext = span != null ? Context.current().with(span) : Context.current();

try (Scope scope = currentContext.makeCurrent()) {
if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
try (Scope ignored = currentContext.makeCurrent()) {
runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
}
}));
}
if (isTaskCompleted(executionResult)) {
timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
try (Scope ignored = currentContext.makeCurrent()) {
dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
}
}));
}
if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
try (Scope ignored = currentContext.makeCurrent()) {
dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
}
}));
}

// key finished
if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
|| isSubFlowTaskKeyCompleted(executionResult)) {
dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
// key finished
if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
|| isSubFlowTaskKeyCompleted(executionResult)) {
runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
try (Scope ignored = currentContext.makeCurrent()) {
dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
}
}));
}
} finally {
if (span != null && executionResult.getTaskStatus().isCompleted()) {
span.setAttribute("status", executionResult.getTaskStatus().toString());
span.end();
}
}
}

Expand All @@ -232,11 +292,17 @@ public void redoTask(String executionId, List<String> taskNames, Map<String, Obj
}

public void submitDAG(String executionId, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
String[] executionIdInfos = executionId.split(ReservedConstant.EXECUTION_ID_CONNECTOR);
String descriptorId = executionIdInfos[0];
Span span = tracerHelper.getTracer().spanBuilder("submitDAG " + descriptorId)
.setAttribute("execution.id", executionId)
.startSpan();
log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
.ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
dagTraversal.submitTraversal(executionId, null);
span.end();
}

public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) {
Expand Down
Loading
Loading