diff --git a/pom.xml b/pom.xml
index c573d98ff..737690625 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
https://sonarcloud.io
4.12.0
0.8.11
+ 1.29.0
@@ -506,42 +507,42 @@
io.opentelemetry.javaagent
opentelemetry-javaagent
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry.instrumentation
opentelemetry-instrumentation-annotations
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-api
- 1.29.0
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-exporter-otlp
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-sdk
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-semconv
1.29.0-alpha
-
- io.opentelemetry
- opentelemetry-sdk-extension-autoconfigure
- 1.30.1
-
io.opentelemetry
opentelemetry-sdk-extension-autoconfigure-spi
- 1.29.0
+ ${opentelemetry.version}
diff --git a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
index a6391dc31..3c29a223c 100644
--- a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
+++ b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
@@ -299,10 +299,11 @@ public DAGOperations dagOperations(
@Autowired @Qualifier("dagCallback") Callback dagCallback,
@Autowired @Qualifier("timeCheckRunner") TimeCheckRunner timeCheckRunner,
@Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor,
- @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) {
+ @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler,
+ @Autowired @Qualifier("tracerHelper") TracerHelper tracerHelper) {
log.info("begin to init default DAGOperations bean");
DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner,
- timeCheckRunner, dagTraversal, dagCallback, dagResultHandler);
+ timeCheckRunner, dagTraversal, dagCallback, dagResultHandler, tracerHelper);
dagTraversal.setDagOperations(dagOperations);
timeCheckRunner.setDagOperations(dagOperations);
return dagOperations;
diff --git a/rill-flow-dag/olympicene-traversal/pom.xml b/rill-flow-dag/olympicene-traversal/pom.xml
index c13f365bf..eae63ac02 100644
--- a/rill-flow-dag/olympicene-traversal/pom.xml
+++ b/rill-flow-dag/olympicene-traversal/pom.xml
@@ -81,6 +81,10 @@
io.opentelemetry
opentelemetry-sdk-extension-autoconfigure-spi
+
+ com.weibo
+ rill-flow-common
+
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
index 81c43f237..db6a55006 100644
--- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.weibo.rill.flow.common.constant.ReservedConstant;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.*;
@@ -37,9 +38,13 @@
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -61,6 +66,7 @@ public class DAGOperations {
private final DAGTraversal dagTraversal;
private final Callback callback;
private final DAGResultHandler dagResultHandler;
+ private final TracerHelper tracerHelper;
public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> {
@@ -79,7 +85,7 @@ public class DAGOperations {
public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner,
TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback,
- DAGResultHandler dagResultHandler) {
+ DAGResultHandler dagResultHandler, TracerHelper tracerHelper) {
this.runnerExecutor = runnerExecutor;
this.taskRunners = taskRunners;
this.dagRunner = dagRunner;
@@ -87,6 +93,7 @@ public DAGOperations(ExecutorService runnerExecutor, Map tas
this.dagTraversal = dagTraversal;
this.callback = callback;
this.dagResultHandler = dagResultHandler;
+ this.tracerHelper = tracerHelper;
}
public void runTasks(String executionId, Collection>> taskInfoToContexts) {
@@ -109,44 +116,70 @@ private void runTask(String executionId, TaskInfo taskInfo, Map
params.put("taskInfo", taskInfo);
params.put("context", context);
- TaskRunner runner = selectRunner(taskInfo);
- Supplier basicActions = () -> runner.run(executionId, taskInfo, context);
-
- Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
- ExecutionResult executionResult = supplier.get();
-
- /*
- 任务执行后结果类型
- 1. 任务执行完成 如 return/pass
- 1.1 任务执行完成 需要寻找下一个能执行的任务
- 1.2 任务执行中 需要外部系统调finish触发下一个任务执行
- 1.3 任务需要重试
- 2. 流程控制类任务 foreach/choice
- 2.1 触发能够执行的子任务
- */
- // 对应1.1
- if (isTaskCompleted(executionResult)) {
- dagTraversal.submitTraversal(executionId, taskInfo.getName());
- invokeTaskCallback(executionId, taskInfo, context);
- }
- // 对应1.2
- if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
- Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
- Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
- .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
- }
- // 对应1.3
- if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
- runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
- executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
- }
- // 对应2.1
- if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
- executionResult.getSubTaskInfosAndContext()
- .forEach(subTaskInfosAndContext -> {
- dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
- safeSleep(10);
- });
+ // 获取当前的 Context,这应该包含了从异步任务传递过来的 context
+ Context parentContext = Context.current();
+
+ Span span = tracerHelper.getTracer().spanBuilder("runTask " + taskInfo.getName())
+ .setAttribute("execution.id", executionId)
+ .setAttribute("task.name", taskInfo.getName())
+ .setAttribute("task.category", taskInfo.getTask().getCategory())
+ .setParent(parentContext) // 显式设置父 context
+ .startSpan();
+
+ TaskStatus executionResultStatus = null;
+ try (Scope scope = Context.current().with(span).makeCurrent()) {
+ TaskRunner runner = selectRunner(taskInfo);
+ Supplier basicActions = () -> runner.run(executionId, taskInfo, context);
+
+ Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
+ ExecutionResult executionResult = supplier.get();
+
+ /*
+ 任务执行后结果类型
+ 1. 任务执行完成 如 return/pass
+ 1.1 任务执行完成 需要寻找下一个能执行的任务
+ 1.2 任务执行中 需要外部系统调finish触发下一个任务执行
+ 1.3 任务需要重试
+ 2. 流程控制类任务 foreach/choice
+ 2.1 触发能够执行的子任务
+ */
+ // 对应1.1
+ if (isTaskCompleted(executionResult)) {
+ Context currentContext = Context.current();
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, taskInfo.getName());
+ invokeTaskCallback(executionId, taskInfo, context);
+ }
+ }));
+ }
+ executionResultStatus = executionResult.getTaskStatus();
+ // 对应1.2
+ if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
+ Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
+ Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
+ .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
+ tracerHelper.saveContext(executionId, taskInfo.getName(), parentContext, span);
+ return;
+ }
+ // 对应1.3
+ if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
+ runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
+ executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
+ }
+ // 对应2.1
+ if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
+ executionResult.getSubTaskInfosAndContext()
+ .forEach(subTaskInfosAndContext -> {
+ dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
+ safeSleep(10);
+ });
+ }
+ } finally {
+ if (executionResultStatus != null && executionResultStatus.isCompleted()) {
+ span.setAttribute("status", executionResultStatus.toString());
+ span.end();
+ }
}
}
@@ -204,24 +237,51 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n
Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS);
ExecutionResult executionResult = supplier.get();
- if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
- timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
- runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
- executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
- }
- if (isTaskCompleted(executionResult)) {
- timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
- dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
- invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
- }
- if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
- dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
- }
+ // 尝试恢复之前的 span
+ Span span = tracerHelper.loadSpan(executionId, executionResult.getTaskInfo().getName());
+ Context currentContext = span != null ? Context.current().with(span) : Context.current();
+
+ try (Scope scope = currentContext.makeCurrent()) {
+ if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
+ timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
+ executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
+ }
+ }));
+ }
+ if (isTaskCompleted(executionResult)) {
+ timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
+ }
+ }));
+ }
+ if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
+ }
+ }));
+ }
- // key finished
- if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
- || isSubFlowTaskKeyCompleted(executionResult)) {
- dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ // key finished
+ if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
+ || isSubFlowTaskKeyCompleted(executionResult)) {
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ }
+ }));
+ }
+ } finally {
+ if (span != null && executionResult.getTaskStatus().isCompleted()) {
+ span.setAttribute("status", executionResult.getTaskStatus().toString());
+ span.end();
+ }
}
}
@@ -232,11 +292,17 @@ public void redoTask(String executionId, List taskNames, Map data, NotifyInfo notifyInfo) {
+ String[] executionIdInfos = executionId.split(ReservedConstant.EXECUTION_ID_CONNECTOR);
+ String descriptorId = executionIdInfos[0];
+ Span span = tracerHelper.getTracer().spanBuilder("submitDAG " + descriptorId)
+ .setAttribute("execution.id", executionId)
+ .startSpan();
log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
.ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
dagTraversal.submitTraversal(executionId, null);
+ span.end();
}
public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) {
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
index d5f1f8c92..d6a3e0953 100644
--- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
@@ -33,6 +33,7 @@
import com.weibo.rill.flow.olympicene.traversal.helper.DefaultStasher;
import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPath;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping;
@@ -46,15 +47,15 @@ public class OlympiceneFacade {
public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage,
Callback callback, DAGDispatcher dagDispatcher,
DAGStorageProcedure dagStorageProcedure, TimeChecker timeChecker,
- SwitcherManager switcherManager) {
+ SwitcherManager switcherManager, TracerHelper tracerHelper) {
ExecutorService executor = SameThreadExecutorService.INSTANCE;
return build(dagInfoStorage, dagContextStorage, dagStorageProcedure, callback, null,
- dagDispatcher, timeChecker, executor, switcherManager);
+ dagDispatcher, timeChecker, executor, switcherManager, tracerHelper);
}
public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure,
Callback callback, DAGResultHandler dagResultHandler, DAGDispatcher dagDispatcher,
- TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager) {
+ TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager, TracerHelper tracerHelper) {
JSONPathInputOutputMapping jsonPathInputOutputMapping = new JSONPathInputOutputMapping();
DefaultStasher stasher = new DefaultStasher();
@@ -65,7 +66,7 @@ public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage
jsonPathInputOutputMapping, jsonPathInputOutputMapping, dagStorageProcedure, stasher, switcherManager);
DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor);
- DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler);
+ DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler, tracerHelper);
dagTraversal.setDagOperations(dagOperations);
dagTraversal.setStasher(stasher);
timeCheckRunner.setDagOperations(dagOperations);
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java
new file mode 100644
index 000000000..62d59bf47
--- /dev/null
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java
@@ -0,0 +1,93 @@
+package com.weibo.rill.flow.olympicene.traversal.helper;
+
+import com.alibaba.fastjson.JSONObject;
+import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
+import io.opentelemetry.api.trace.*;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NoArgsConstructor
+public class TracerHelper {
+ private RedisClient redisClient;
+ @Getter
+ private Tracer tracer;
+
+ public TracerHelper(RedisClient redisClient, Tracer tracer) {
+ this.redisClient = redisClient;
+ this.tracer = tracer;
+ }
+
+ // Redis key 前缀
+ private static final String TRACE_KEY_PREFIX = "rill_flow_trace_";
+ // 设置合适的过期时间(例如24小时)
+ private static final int TRACE_EXPIRE_SECONDS = 2 * 60 * 60;
+
+ public void removeSpanContext(String executionId, String taskId) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ redisClient.del(key.getBytes());
+ } catch (Exception e) {
+ log.error("Failed to remove span context from Redis for task: {}", taskId, e);
+ }
+ }
+
+ public void saveContext(String executionId, String taskId, Context parentContext, Span currentSpan) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ JSONObject contextInfo = new JSONObject();
+ SpanContext spanContext = currentSpan.getSpanContext();
+ SpanContext parentSpanContext = Span.fromContext(parentContext).getSpanContext();
+
+ contextInfo.put("traceId", spanContext.getTraceId());
+ contextInfo.put("spanId", spanContext.getSpanId());
+ contextInfo.put("parentSpanId", parentSpanContext.getSpanId());
+ contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex());
+ contextInfo.put("startTime", System.currentTimeMillis()); // 保存开始时间
+
+ redisClient.set(key, contextInfo.toJSONString());
+ redisClient.expire(key, TRACE_EXPIRE_SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to save context to Redis for task: {}", taskId, e);
+ }
+ }
+
+ public Span loadSpan(String executionId, String taskId) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ String contextInfoString = redisClient.get(key);
+
+ if (contextInfoString == null || contextInfoString.isEmpty()) {
+ return null;
+ }
+
+ JSONObject contextInfo = JSONObject.parseObject(contextInfoString);
+ String traceId = contextInfo.getString("traceId");
+ String spanId = contextInfo.getString("spanId");
+ String parentSpanId = contextInfo.getString("parentSpanId");
+ String traceFlags = contextInfo.getString("traceFlags");
+ long startTime = Long.parseLong(contextInfo.getString("startTime"));
+
+ SpanContext parentContext = SpanContext.create(
+ traceId,
+ parentSpanId,
+ TraceFlags.fromHex(traceFlags, 0),
+ TraceState.getDefault()
+ );
+
+ return tracer.spanBuilder("runTask " + taskId)
+ .setParent(Context.current().with(Span.wrap(parentContext)))
+ .setAttribute("original.span.id", spanId)
+ .setStartTimestamp(startTime, java.util.concurrent.TimeUnit.MILLISECONDS) // 设置正确的开始时间
+ .startSpan();
+ } catch (Exception e) {
+ log.error("Failed to load span from Redis for task: {}", taskId, e);
+ return null;
+ } finally {
+ removeSpanContext(executionId, taskId);
+ }
+ }
+}
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy
index 777193699..c34704e8a 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy
@@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class ChoiceTaskTraversalTest extends Specification {
@@ -29,7 +30,7 @@ class ChoiceTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
/**
* A -> B -> C
* B为choice:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy
index 3bd5b4ba7..3f00371a8 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy
@@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class ChoiceWithForeachTaskTraversalTest extends Specification {
@@ -29,7 +30,7 @@ class ChoiceWithForeachTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
/**
* A -> B -> C
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy
index 6ba1c32cf..843decbfd 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy
@@ -22,6 +22,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class ConstMappingsTest extends Specification {
@@ -32,7 +33,7 @@ class ConstMappingsTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test const mappings should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy
index d0d841fef..6b67e8c7a 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy
@@ -17,6 +17,7 @@ import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.result.LocalSyncDAGResultHandler
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
import java.util.concurrent.ExecutorService
@@ -29,7 +30,7 @@ class FlowSyncTest extends Specification {
DAGResultHandler dagResultHandler = new LocalSyncDAGResultHandler()
ExecutorService executor = Executors.newFixedThreadPool(10)
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, dagStorageProcedure, null, dagResultHandler, Mock(DAGDispatcher.class), Mock(DefaultTimeChecker.class), executor, switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, dagStorageProcedure, null, dagResultHandler, Mock(DAGDispatcher.class), Mock(DefaultTimeChecker.class), executor, switcherManager, Mock(Tracer))
def "test one passTask dag should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy
index d1727e55e..6dd1333b2 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy
@@ -24,6 +24,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class ForeachTaskTraversalTest extends Specification {
@@ -33,7 +34,7 @@ class ForeachTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
/**
* A -> B -> C
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy
index 925eb431a..cd348bbca 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy
@@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class FunctionRetryTest extends Specification {
@@ -30,7 +31,7 @@ class FunctionRetryTest extends Specification {
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
DefaultTimeChecker timeChecker = Mock(DefaultTimeChecker.class)
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer))
DAG dag
def setup() {
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy
index c6f4a3795..b80f49c31 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class FunctionTaskToleranceTraversalTest extends Specification {
@@ -30,7 +31,7 @@ class FunctionTaskToleranceTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test one functionTask skip dag should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy
index b732a1d13..69bd0d62a 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class FunctionTaskTraversalFailedTest extends Specification {
@@ -31,7 +32,7 @@ class FunctionTaskTraversalFailedTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test one functionTask failed dag should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy
index 66d86f4b4..e13639e93 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class FunctionTaskTraversalTest extends Specification {
@@ -31,7 +32,7 @@ class FunctionTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test one functionTask dag should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy
index 2b56121de..0b9371794 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class InvokeMsgTest extends Specification {
@@ -30,7 +31,7 @@ class InvokeMsgTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def setup() {
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy
index c4b3c9a1e..a1c8505e8 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import org.apache.commons.collections.CollectionUtils
import spock.lang.Specification
@@ -31,7 +32,7 @@ class InvokeTimeTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
String executionId = 'executionId'
def "dagInfo and taskInfo should save invoke time"() {
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy
index cf4bd841b..fc46d9d5f 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy
@@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import groovy.util.logging.Slf4j
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
@Slf4j
@@ -31,7 +32,7 @@ class MultiDAGTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
DAG dag
def setup() {
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy
index 1f0a41eed..9cefd2fce 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy
@@ -1,12 +1,12 @@
package com.weibo.rill.flow.olympicene.traversal
+import com.weibo.rill.flow.interfaces.model.task.TaskStatus
import com.weibo.rill.flow.olympicene.core.event.Callback
import com.weibo.rill.flow.olympicene.core.event.Event
import com.weibo.rill.flow.olympicene.core.model.DAGSettings
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo
import com.weibo.rill.flow.olympicene.core.model.dag.DAG
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus
-import com.weibo.rill.flow.interfaces.model.task.TaskStatus
import com.weibo.rill.flow.olympicene.core.runtime.DAGParser
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager
@@ -14,14 +14,14 @@ import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser
import com.weibo.rill.flow.olympicene.ddl.serialize.YAMLSerializer
import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.FlowDAGValidator
import com.weibo.rill.flow.olympicene.ddl.validation.task.impl.FunctionTaskValidator
-import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient
import com.weibo.rill.flow.olympicene.storage.save.impl.DAGLocalStorage
import com.weibo.rill.flow.olympicene.storage.save.impl.LocalStorageProcedure
+import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
+import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
-import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
-import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class PassTaskTraversalTest extends Specification {
@@ -31,7 +31,7 @@ class PassTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test one passTask dag should work well"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy
index 77f59cea1..3c4eeadf6 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy
@@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import groovy.util.logging.Slf4j
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
@@ -31,7 +32,7 @@ class RedoTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
String executionId = "xxx1"
DAG dag
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy
index 0353df1a6..18ba28bb2 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy
@@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
@@ -28,7 +29,7 @@ class ReturnTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "if return task status is success then next tasks status should be skip"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy
index 580bc4850..185858807 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy
@@ -23,6 +23,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class SuspenseTaskTraversalTest extends Specification {
@@ -32,7 +33,7 @@ class SuspenseTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test suspense task"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy
index 1999d53e4..a3b3491fe 100755
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy
@@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class SwitchTaskTraversalTest extends Specification {
@@ -27,7 +28,7 @@ class SwitchTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "test basic switch"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy
index 962e3837c..39eccd9be 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy
@@ -19,6 +19,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
@@ -29,7 +30,7 @@ class TaskDegradeTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
def "degrade current task only test"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy
index 9d9c8b139..75acfa00a 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy
@@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
class TaskProgressTest extends Specification {
@@ -28,7 +29,7 @@ class TaskProgressTest extends Specification {
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
TimeChecker timeChecker = Mock(DefaultTimeChecker.class)
SwitcherManager switcherManager = Mock(SwitcherManager.class)
- Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager)
+ Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer))
def "degrade current task only test"() {
given:
diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy
index 9d9363b82..628a8383c 100644
--- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy
+++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy
@@ -25,6 +25,7 @@ import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
+import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification
@@ -42,7 +43,7 @@ class TimeCheckerTest extends Specification {
def setup() {
timeChecker.redisClient = Mock(RedisClient.class)
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true
- olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager)
+ olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer))
timeChecker.timeCheckRunner = olympicene.dagOperations.timeCheckRunner
}
diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
index 834832838..fdf265d3f 100644
--- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
+++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
@@ -266,7 +266,8 @@ public JSONObject getDescriptor(String descriptorId) {
*/
private void generateResourceProtocol(JSONObject task) {
try {
- if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))) {
+ if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))
+ || task.getString("resourceName") == null) {
return;
}
String resourceName = task.getString("resourceName");
diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
index f07398834..3293fc1ad 100644
--- a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
+++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
@@ -31,6 +31,7 @@
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo;
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher;
import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping;
import com.weibo.rill.flow.service.component.OlympiceneCallback;
import com.weibo.rill.flow.service.component.RuntimeExecutorServiceProxy;
@@ -47,6 +48,7 @@
import com.weibo.rill.flow.service.storage.RuntimeRedisClients;
import com.weibo.rill.flow.service.storage.RuntimeStorage;
import com.weibo.rill.flow.service.util.IpUtils;
+import io.opentelemetry.api.trace.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
@@ -122,6 +124,13 @@ public BusinessTimeChecker timeChecker(
return new BusinessTimeChecker(redisClient);
}
+ @Bean
+ public TracerHelper tracerHelper(
+ @Autowired @Qualifier("runtimeRedisClients") RedisClient redisClient,
+ @Autowired Tracer tracer) {
+ return new TracerHelper(redisClient, tracer);
+ }
+
@Bean(destroyMethod = "shutdown")
public ExecutorService notifyExecutor(@Autowired BizDConfs bizDConfs,
diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java
new file mode 100644
index 000000000..381489007
--- /dev/null
+++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java
@@ -0,0 +1,63 @@
+package com.weibo.rill.flow.configuration;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Slf4j
+public class OpenTelemetryConfig {
+ @Value("${otel.service.name:rill-flow}")
+ private String serviceName;
+
+ @Value("${otel.exporter.otlp.endpoint:http://jaeger:4317}")
+ private String endpoint;
+
+ @Value("${otel.traces.sampler.probability:1.0}")
+ private double samplerProbability;
+
+ @Bean
+ public OpenTelemetry openTelemetry() {
+ log.info("Initializing OpenTelemetry with endpoint: {}, service name: {}", endpoint, serviceName);
+
+ Resource resource = Resource.getDefault()
+ .merge(Resource.create(Attributes.of(
+ ResourceAttributes.SERVICE_NAME, serviceName,
+ ResourceAttributes.SERVICE_VERSION, "1.0.0"
+ )));
+
+ OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
+ .setEndpoint(endpoint)
+ .build();
+
+ SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
+ .setSampler(Sampler.traceIdRatioBased(samplerProbability))
+ .addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
+ .build())
+ .setResource(resource)
+ .build();
+
+ return OpenTelemetrySdk.builder()
+ .setTracerProvider(sdkTracerProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .buildAndRegisterGlobal();
+ }
+
+ @Bean
+ public Tracer tracer(OpenTelemetry openTelemetry) {
+ return openTelemetry.getTracer("rill-flow", "1.0.0");
+ }
+}
diff --git a/rill-flow-web/src/main/resources/application.properties b/rill-flow-web/src/main/resources/application.properties
index 30ae7c15a..f4c629bb3 100644
--- a/rill-flow-web/src/main/resources/application.properties
+++ b/rill-flow-web/src/main/resources/application.properties
@@ -24,4 +24,8 @@ spring.mvc.pathmatch.matching-strategy=ant_path_matcher
rill.flow.task.template.datasource.jdbc.master.url=jdbc:mysql://rill-flow-mysql:3306/rill_flow?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
rill.flow.task.template.datasource.master.user=root
-rill.flow.task.template.datasource.master.password=secret
\ No newline at end of file
+rill.flow.task.template.datasource.master.password=secret
+
+otel.service.name=rill-flow
+otel.exporter.otlp.endpoint=http://jaeger:4317
+otel.traces.sampler.probability=1.0
\ No newline at end of file