diff --git a/pom.xml b/pom.xml index c573d98ff..737690625 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ https://sonarcloud.io 4.12.0 0.8.11 + 1.29.0 @@ -506,42 +507,42 @@ io.opentelemetry.javaagent opentelemetry-javaagent - 1.29.0 + ${opentelemetry.version} io.opentelemetry.instrumentation opentelemetry-instrumentation-annotations - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-api - 1.29.0 + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.version} io.opentelemetry opentelemetry-exporter-otlp - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-sdk - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-semconv 1.29.0-alpha - - io.opentelemetry - opentelemetry-sdk-extension-autoconfigure - 1.30.1 - io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi - 1.29.0 + ${opentelemetry.version} diff --git a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java index a6391dc31..3c29a223c 100644 --- a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java +++ b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java @@ -299,10 +299,11 @@ public DAGOperations dagOperations( @Autowired @Qualifier("dagCallback") Callback dagCallback, @Autowired @Qualifier("timeCheckRunner") TimeCheckRunner timeCheckRunner, @Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor, - @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) { + @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler, + @Autowired @Qualifier("tracerHelper") TracerHelper tracerHelper) { log.info("begin to init default DAGOperations bean"); DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner, - timeCheckRunner, dagTraversal, dagCallback, dagResultHandler); + timeCheckRunner, dagTraversal, dagCallback, dagResultHandler, tracerHelper); dagTraversal.setDagOperations(dagOperations); timeCheckRunner.setDagOperations(dagOperations); return dagOperations; diff --git a/rill-flow-dag/olympicene-traversal/pom.xml b/rill-flow-dag/olympicene-traversal/pom.xml index c13f365bf..eae63ac02 100644 --- a/rill-flow-dag/olympicene-traversal/pom.xml +++ b/rill-flow-dag/olympicene-traversal/pom.xml @@ -81,6 +81,10 @@ io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi + + com.weibo + rill-flow-common + diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index 81c43f237..db6a55006 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.weibo.rill.flow.common.constant.ReservedConstant; import com.weibo.rill.flow.interfaces.model.mapping.Mapping; import com.weibo.rill.flow.interfaces.model.strategy.Timeline; import com.weibo.rill.flow.interfaces.model.task.*; @@ -37,9 +38,13 @@ import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode; import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException; import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner; import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner; import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -61,6 +66,7 @@ public class DAGOperations { private final DAGTraversal dagTraversal; private final Callback callback; private final DAGResultHandler dagResultHandler; + private final TracerHelper tracerHelper; public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { @@ -79,7 +85,7 @@ public class DAGOperations { public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner, TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback, - DAGResultHandler dagResultHandler) { + DAGResultHandler dagResultHandler, TracerHelper tracerHelper) { this.runnerExecutor = runnerExecutor; this.taskRunners = taskRunners; this.dagRunner = dagRunner; @@ -87,6 +93,7 @@ public DAGOperations(ExecutorService runnerExecutor, Map tas this.dagTraversal = dagTraversal; this.callback = callback; this.dagResultHandler = dagResultHandler; + this.tracerHelper = tracerHelper; } public void runTasks(String executionId, Collection>> taskInfoToContexts) { @@ -109,44 +116,70 @@ private void runTask(String executionId, TaskInfo taskInfo, Map params.put("taskInfo", taskInfo); params.put("context", context); - TaskRunner runner = selectRunner(taskInfo); - Supplier basicActions = () -> runner.run(executionId, taskInfo, context); - - Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS); - ExecutionResult executionResult = supplier.get(); - - /* - 任务执行后结果类型 - 1. 任务执行完成 如 return/pass - 1.1 任务执行完成 需要寻找下一个能执行的任务 - 1.2 任务执行中 需要外部系统调finish触发下一个任务执行 - 1.3 任务需要重试 - 2. 流程控制类任务 foreach/choice - 2.1 触发能够执行的子任务 - */ - // 对应1.1 - if (isTaskCompleted(executionResult)) { - dagTraversal.submitTraversal(executionId, taskInfo.getName()); - invokeTaskCallback(executionId, taskInfo, context); - } - // 对应1.2 - if (executionResult.getTaskStatus() == TaskStatus.RUNNING) { - Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null); - Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline)) - .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds)); - } - // 对应1.3 - if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { - runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), - executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); - } - // 对应2.1 - if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) { - executionResult.getSubTaskInfosAndContext() - .forEach(subTaskInfosAndContext -> { - dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight()); - safeSleep(10); - }); + // 获取当前的 Context,这应该包含了从异步任务传递过来的 context + Context parentContext = Context.current(); + + Span span = tracerHelper.getTracer().spanBuilder("runTask " + taskInfo.getName()) + .setAttribute("execution.id", executionId) + .setAttribute("task.name", taskInfo.getName()) + .setAttribute("task.category", taskInfo.getTask().getCategory()) + .setParent(parentContext) // 显式设置父 context + .startSpan(); + + TaskStatus executionResultStatus = null; + try (Scope scope = Context.current().with(span).makeCurrent()) { + TaskRunner runner = selectRunner(taskInfo); + Supplier basicActions = () -> runner.run(executionId, taskInfo, context); + + Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS); + ExecutionResult executionResult = supplier.get(); + + /* + 任务执行后结果类型 + 1. 任务执行完成 如 return/pass + 1.1 任务执行完成 需要寻找下一个能执行的任务 + 1.2 任务执行中 需要外部系统调finish触发下一个任务执行 + 1.3 任务需要重试 + 2. 流程控制类任务 foreach/choice + 2.1 触发能够执行的子任务 + */ + // 对应1.1 + if (isTaskCompleted(executionResult)) { + Context currentContext = Context.current(); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, taskInfo.getName()); + invokeTaskCallback(executionId, taskInfo, context); + } + })); + } + executionResultStatus = executionResult.getTaskStatus(); + // 对应1.2 + if (executionResult.getTaskStatus() == TaskStatus.RUNNING) { + Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null); + Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline)) + .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds)); + tracerHelper.saveContext(executionId, taskInfo.getName(), parentContext, span); + return; + } + // 对应1.3 + if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { + runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), + executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); + } + // 对应2.1 + if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) { + executionResult.getSubTaskInfosAndContext() + .forEach(subTaskInfosAndContext -> { + dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight()); + safeSleep(10); + }); + } + } finally { + if (executionResultStatus != null && executionResultStatus.isCompleted()) { + span.setAttribute("status", executionResultStatus.toString()); + span.end(); + } } } @@ -204,24 +237,51 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS); ExecutionResult executionResult = supplier.get(); - if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { - timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); - runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), - executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); - } - if (isTaskCompleted(executionResult)) { - timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); - dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); - invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); - } - if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) { - dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal()); - } + // 尝试恢复之前的 span + Span span = tracerHelper.loadSpan(executionId, executionResult.getTaskInfo().getName()); + Context currentContext = span != null ? Context.current().with(span) : Context.current(); + + try (Scope scope = currentContext.makeCurrent()) { + if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { + timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), + executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); + } + })); + } + if (isTaskCompleted(executionResult)) { + timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); + } + })); + } + if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) { + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal()); + } + })); + } - // key finished - if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex()) - || isSubFlowTaskKeyCompleted(executionResult)) { - dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + // key finished + if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex()) + || isSubFlowTaskKeyCompleted(executionResult)) { + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + } + })); + } + } finally { + if (span != null && executionResult.getTaskStatus().isCompleted()) { + span.setAttribute("status", executionResult.getTaskStatus().toString()); + span.end(); + } } } @@ -232,11 +292,17 @@ public void redoTask(String executionId, List taskNames, Map data, NotifyInfo notifyInfo) { + String[] executionIdInfos = executionId.split(ReservedConstant.EXECUTION_ID_CONNECTOR); + String descriptorId = executionIdInfos[0]; + Span span = tracerHelper.getTracer().spanBuilder("submitDAG " + descriptorId) + .setAttribute("execution.id", executionId) + .startSpan(); log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo); ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo); Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline())) .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds)); dagTraversal.submitTraversal(executionId, null); + span.end(); } public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java index d5f1f8c92..d6a3e0953 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java @@ -33,6 +33,7 @@ import com.weibo.rill.flow.olympicene.traversal.helper.DefaultStasher; import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService; import com.weibo.rill.flow.olympicene.traversal.helper.Stasher; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPath; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping; @@ -46,15 +47,15 @@ public class OlympiceneFacade { public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, Callback callback, DAGDispatcher dagDispatcher, DAGStorageProcedure dagStorageProcedure, TimeChecker timeChecker, - SwitcherManager switcherManager) { + SwitcherManager switcherManager, TracerHelper tracerHelper) { ExecutorService executor = SameThreadExecutorService.INSTANCE; return build(dagInfoStorage, dagContextStorage, dagStorageProcedure, callback, null, - dagDispatcher, timeChecker, executor, switcherManager); + dagDispatcher, timeChecker, executor, switcherManager, tracerHelper); } public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure, Callback callback, DAGResultHandler dagResultHandler, DAGDispatcher dagDispatcher, - TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager) { + TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager, TracerHelper tracerHelper) { JSONPathInputOutputMapping jsonPathInputOutputMapping = new JSONPathInputOutputMapping(); DefaultStasher stasher = new DefaultStasher(); @@ -65,7 +66,7 @@ public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage jsonPathInputOutputMapping, jsonPathInputOutputMapping, dagStorageProcedure, stasher, switcherManager); DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor); - DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler); + DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler, tracerHelper); dagTraversal.setDagOperations(dagOperations); dagTraversal.setStasher(stasher); timeCheckRunner.setDagOperations(dagOperations); diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java new file mode 100644 index 000000000..62d59bf47 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java @@ -0,0 +1,93 @@ +package com.weibo.rill.flow.olympicene.traversal.helper; + +import com.alibaba.fastjson.JSONObject; +import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient; +import io.opentelemetry.api.trace.*; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@NoArgsConstructor +public class TracerHelper { + private RedisClient redisClient; + @Getter + private Tracer tracer; + + public TracerHelper(RedisClient redisClient, Tracer tracer) { + this.redisClient = redisClient; + this.tracer = tracer; + } + + // Redis key 前缀 + private static final String TRACE_KEY_PREFIX = "rill_flow_trace_"; + // 设置合适的过期时间(例如24小时) + private static final int TRACE_EXPIRE_SECONDS = 2 * 60 * 60; + + public void removeSpanContext(String executionId, String taskId) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + redisClient.del(key.getBytes()); + } catch (Exception e) { + log.error("Failed to remove span context from Redis for task: {}", taskId, e); + } + } + + public void saveContext(String executionId, String taskId, Context parentContext, Span currentSpan) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + JSONObject contextInfo = new JSONObject(); + SpanContext spanContext = currentSpan.getSpanContext(); + SpanContext parentSpanContext = Span.fromContext(parentContext).getSpanContext(); + + contextInfo.put("traceId", spanContext.getTraceId()); + contextInfo.put("spanId", spanContext.getSpanId()); + contextInfo.put("parentSpanId", parentSpanContext.getSpanId()); + contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex()); + contextInfo.put("startTime", System.currentTimeMillis()); // 保存开始时间 + + redisClient.set(key, contextInfo.toJSONString()); + redisClient.expire(key, TRACE_EXPIRE_SECONDS); + } catch (Exception e) { + log.error("Failed to save context to Redis for task: {}", taskId, e); + } + } + + public Span loadSpan(String executionId, String taskId) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + String contextInfoString = redisClient.get(key); + + if (contextInfoString == null || contextInfoString.isEmpty()) { + return null; + } + + JSONObject contextInfo = JSONObject.parseObject(contextInfoString); + String traceId = contextInfo.getString("traceId"); + String spanId = contextInfo.getString("spanId"); + String parentSpanId = contextInfo.getString("parentSpanId"); + String traceFlags = contextInfo.getString("traceFlags"); + long startTime = Long.parseLong(contextInfo.getString("startTime")); + + SpanContext parentContext = SpanContext.create( + traceId, + parentSpanId, + TraceFlags.fromHex(traceFlags, 0), + TraceState.getDefault() + ); + + return tracer.spanBuilder("runTask " + taskId) + .setParent(Context.current().with(Span.wrap(parentContext))) + .setAttribute("original.span.id", spanId) + .setStartTimestamp(startTime, java.util.concurrent.TimeUnit.MILLISECONDS) // 设置正确的开始时间 + .startSpan(); + } catch (Exception e) { + log.error("Failed to load span from Redis for task: {}", taskId, e); + return null; + } finally { + removeSpanContext(executionId, taskId); + } + } +} diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy index 777193699..c34704e8a 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceTaskTraversalTest.groovy @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class ChoiceTaskTraversalTest extends Specification { @@ -29,7 +30,7 @@ class ChoiceTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) /** * A -> B -> C * B为choice: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy index 3bd5b4ba7..3f00371a8 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ChoiceWithForeachTaskTraversalTest.groovy @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class ChoiceWithForeachTaskTraversalTest extends Specification { @@ -29,7 +30,7 @@ class ChoiceWithForeachTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) /** * A -> B -> C diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy index 6ba1c32cf..843decbfd 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ConstMappingsTest.groovy @@ -22,6 +22,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class ConstMappingsTest extends Specification { @@ -32,7 +33,7 @@ class ConstMappingsTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test const mappings should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy index d0d841fef..6b67e8c7a 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FlowSyncTest.groovy @@ -17,6 +17,7 @@ import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.result.LocalSyncDAGResultHandler +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification import java.util.concurrent.ExecutorService @@ -29,7 +30,7 @@ class FlowSyncTest extends Specification { DAGResultHandler dagResultHandler = new LocalSyncDAGResultHandler() ExecutorService executor = Executors.newFixedThreadPool(10) SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, dagStorageProcedure, null, dagResultHandler, Mock(DAGDispatcher.class), Mock(DefaultTimeChecker.class), executor, switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, dagStorageProcedure, null, dagResultHandler, Mock(DAGDispatcher.class), Mock(DefaultTimeChecker.class), executor, switcherManager, Mock(Tracer)) def "test one passTask dag should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy index d1727e55e..6dd1333b2 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy @@ -24,6 +24,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class ForeachTaskTraversalTest extends Specification { @@ -33,7 +34,7 @@ class ForeachTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) /** * A -> B -> C diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy index 925eb431a..cd348bbca 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionRetryTest.groovy @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class FunctionRetryTest extends Specification { @@ -30,7 +31,7 @@ class FunctionRetryTest extends Specification { DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() DefaultTimeChecker timeChecker = Mock(DefaultTimeChecker.class) SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer)) DAG dag def setup() { diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy index c6f4a3795..b80f49c31 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskToleranceTraversalTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class FunctionTaskToleranceTraversalTest extends Specification { @@ -30,7 +31,7 @@ class FunctionTaskToleranceTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test one functionTask skip dag should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy index b732a1d13..69bd0d62a 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalFailedTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class FunctionTaskTraversalFailedTest extends Specification { @@ -31,7 +32,7 @@ class FunctionTaskTraversalFailedTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test one functionTask failed dag should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy index 66d86f4b4..e13639e93 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/FunctionTaskTraversalTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class FunctionTaskTraversalTest extends Specification { @@ -31,7 +32,7 @@ class FunctionTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test one functionTask dag should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy index 2b56121de..0b9371794 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class InvokeMsgTest extends Specification { @@ -30,7 +31,7 @@ class InvokeMsgTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def setup() { switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy index c4b3c9a1e..a1c8505e8 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeTimeTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import org.apache.commons.collections.CollectionUtils import spock.lang.Specification @@ -31,7 +32,7 @@ class InvokeTimeTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) String executionId = 'executionId' def "dagInfo and taskInfo should save invoke time"() { diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy index cf4bd841b..fc46d9d5f 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import groovy.util.logging.Slf4j +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification @Slf4j @@ -31,7 +32,7 @@ class MultiDAGTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) DAG dag def setup() { diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy index 1f0a41eed..9cefd2fce 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/PassTaskTraversalTest.groovy @@ -1,12 +1,12 @@ package com.weibo.rill.flow.olympicene.traversal +import com.weibo.rill.flow.interfaces.model.task.TaskStatus import com.weibo.rill.flow.olympicene.core.event.Callback import com.weibo.rill.flow.olympicene.core.event.Event import com.weibo.rill.flow.olympicene.core.model.DAGSettings import com.weibo.rill.flow.olympicene.core.model.NotifyInfo import com.weibo.rill.flow.olympicene.core.model.dag.DAG import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus -import com.weibo.rill.flow.interfaces.model.task.TaskStatus import com.weibo.rill.flow.olympicene.core.runtime.DAGParser import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager @@ -14,14 +14,14 @@ import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser import com.weibo.rill.flow.olympicene.ddl.serialize.YAMLSerializer import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.FlowDAGValidator import com.weibo.rill.flow.olympicene.ddl.validation.task.impl.FunctionTaskValidator -import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient import com.weibo.rill.flow.olympicene.storage.save.impl.DAGLocalStorage import com.weibo.rill.flow.olympicene.storage.save.impl.LocalStorageProcedure +import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo +import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher -import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo -import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class PassTaskTraversalTest extends Specification { @@ -31,7 +31,7 @@ class PassTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test one passTask dag should work well"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy index 77f59cea1..3c4eeadf6 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/RedoTraversalTest.groovy @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import groovy.util.logging.Slf4j +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification @@ -31,7 +32,7 @@ class RedoTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) String executionId = "xxx1" DAG dag diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy index 0353df1a6..18ba28bb2 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ReturnTaskTraversalTest.groovy @@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification @@ -28,7 +29,7 @@ class ReturnTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "if return task status is success then next tasks status should be skip"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy index 580bc4850..185858807 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy @@ -23,6 +23,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class SuspenseTaskTraversalTest extends Specification { @@ -32,7 +33,7 @@ class SuspenseTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test suspense task"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy index 1999d53e4..a3b3491fe 100755 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SwitchTaskTraversalTest.groovy @@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class SwitchTaskTraversalTest extends Specification { @@ -27,7 +28,7 @@ class SwitchTaskTraversalTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "test basic switch"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy index 962e3837c..39eccd9be 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskDegradeTest.groovy @@ -19,6 +19,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification @@ -29,7 +30,7 @@ class TaskDegradeTest extends Specification { DAGDispatcher dispatcher = Mock(DAGDispatcher.class) DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer)) def "degrade current task only test"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy index 9d9c8b139..75acfa00a 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TaskProgressTest.groovy @@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker import com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification class TaskProgressTest extends Specification { @@ -28,7 +29,7 @@ class TaskProgressTest extends Specification { DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure() TimeChecker timeChecker = Mock(DefaultTimeChecker.class) SwitcherManager switcherManager = Mock(SwitcherManager.class) - Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager) + Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer)) def "degrade current task only test"() { given: diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy index 9d9363b82..628a8383c 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy @@ -25,6 +25,7 @@ import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent +import io.opentelemetry.api.trace.Tracer import spock.lang.Specification @@ -42,7 +43,7 @@ class TimeCheckerTest extends Specification { def setup() { timeChecker.redisClient = Mock(RedisClient.class) switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true - olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager) + olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer)) timeChecker.timeCheckRunner = olympicene.dagOperations.timeCheckRunner } diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java index 834832838..fdf265d3f 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java @@ -266,7 +266,8 @@ public JSONObject getDescriptor(String descriptorId) { */ private void generateResourceProtocol(JSONObject task) { try { - if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))) { + if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL)) + || task.getString("resourceName") == null) { return; } String resourceName = task.getString("resourceName"); diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java index f07398834..3293fc1ad 100644 --- a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java +++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java @@ -31,6 +31,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo; import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher; import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping; import com.weibo.rill.flow.service.component.OlympiceneCallback; import com.weibo.rill.flow.service.component.RuntimeExecutorServiceProxy; @@ -47,6 +48,7 @@ import com.weibo.rill.flow.service.storage.RuntimeRedisClients; import com.weibo.rill.flow.service.storage.RuntimeStorage; import com.weibo.rill.flow.service.util.IpUtils; +import io.opentelemetry.api.trace.Tracer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfigureOrder; @@ -122,6 +124,13 @@ public BusinessTimeChecker timeChecker( return new BusinessTimeChecker(redisClient); } + @Bean + public TracerHelper tracerHelper( + @Autowired @Qualifier("runtimeRedisClients") RedisClient redisClient, + @Autowired Tracer tracer) { + return new TracerHelper(redisClient, tracer); + } + @Bean(destroyMethod = "shutdown") public ExecutorService notifyExecutor(@Autowired BizDConfs bizDConfs, diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java new file mode 100644 index 000000000..381489007 --- /dev/null +++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java @@ -0,0 +1,63 @@ +package com.weibo.rill.flow.configuration; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class OpenTelemetryConfig { + @Value("${otel.service.name:rill-flow}") + private String serviceName; + + @Value("${otel.exporter.otlp.endpoint:http://jaeger:4317}") + private String endpoint; + + @Value("${otel.traces.sampler.probability:1.0}") + private double samplerProbability; + + @Bean + public OpenTelemetry openTelemetry() { + log.info("Initializing OpenTelemetry with endpoint: {}, service name: {}", endpoint, serviceName); + + Resource resource = Resource.getDefault() + .merge(Resource.create(Attributes.of( + ResourceAttributes.SERVICE_NAME, serviceName, + ResourceAttributes.SERVICE_VERSION, "1.0.0" + ))); + + OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint(endpoint) + .build(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(Sampler.traceIdRatioBased(samplerProbability)) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter) + .build()) + .setResource(resource) + .build(); + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + } + + @Bean + public Tracer tracer(OpenTelemetry openTelemetry) { + return openTelemetry.getTracer("rill-flow", "1.0.0"); + } +} diff --git a/rill-flow-web/src/main/resources/application.properties b/rill-flow-web/src/main/resources/application.properties index 30ae7c15a..f4c629bb3 100644 --- a/rill-flow-web/src/main/resources/application.properties +++ b/rill-flow-web/src/main/resources/application.properties @@ -24,4 +24,8 @@ spring.mvc.pathmatch.matching-strategy=ant_path_matcher rill.flow.task.template.datasource.jdbc.master.url=jdbc:mysql://rill-flow-mysql:3306/rill_flow?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8 rill.flow.task.template.datasource.master.user=root -rill.flow.task.template.datasource.master.password=secret \ No newline at end of file +rill.flow.task.template.datasource.master.password=secret + +otel.service.name=rill-flow +otel.exporter.otlp.endpoint=http://jaeger:4317 +otel.traces.sampler.probability=1.0 \ No newline at end of file