[GOBBLIN-2220]: Send Metrics When Flow Spec Already Exists For An Adhoc Flow#4136
[GOBBLIN-2220]: Send Metrics When Flow Spec Already Exists For An Adhoc Flow#4136aga9900 wants to merge 12 commits intoapache:masterfrom
Conversation
...etrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
Show resolved
Hide resolved
| public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed."; | ||
| public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded."; | ||
| public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed."; | ||
| public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded."; |
There was a problem hiding this comment.
success is the expected/default case, so the success metric doesn't give us any additional signal. The failure metric is actionable, and tracking failures should be sufficient here. I would suggest dropping success metric for this one as it just adds noise
| dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); | ||
| } catch (Exception e) { | ||
| super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false); | ||
| log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); |
There was a problem hiding this comment.
Please log the exception as well using log.error(..., e), so that stack trace is captured.
| } catch (Exception e) { | ||
| super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false); | ||
| log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); | ||
| return false; |
There was a problem hiding this comment.
earlier the RuntimeException was not caught here, so it was getting caught in DagProcessingEngine which was marking dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();, now we are gulping the exception so it would not be handled in DagProcessingEngine, we should re-throw the exception here
| log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); | ||
| return false; | ||
| } | ||
| super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), true); |
There was a problem hiding this comment.
imo, we don't need to mark for success
| } catch (SpecNotFoundException e) { | ||
| log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri()); |
There was a problem hiding this comment.
this is not required, since the if block has already checked for existence of flowSpec
| flowSpecExistsForAdhocFlow.mark(); | ||
| } | ||
| } else { | ||
| log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); |
There was a problem hiding this comment.
please move log from line 253 to here, since we are logging twice now
| FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); | ||
| if (!storedFlowSpec.isScheduled()) { | ||
| log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); | ||
| if (!flowSpec.isScheduled()) { |
There was a problem hiding this comment.
if it was an adhoc flow(!storedFlowSpec.isScheduled()) and flowSpec is not deleted, that should be sufficient condition to mark the metric for unexpected behaviour. We don't need to check if current flowSpec is scheduled or not
| if (!storedFlowSpec.isScheduled()) { | ||
| log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); | ||
| if (!flowSpec.isScheduled()) { | ||
| flowSpecExistsForAdhocFlow.mark(); |
There was a problem hiding this comment.
there’s an expected case where a FlowSpec isn't deleted for an adhoc flow, specifically when a new flow is triggered before the previous one is launched. In that case, we throw a TooSoonToRerunSameFlowException from Orchestrator.onAddSpec. We should exclude these scenarios, since these are valid, and only flag cases where the FlowSpec wasn't deleted even after the DAG was launched, which is unexpected.
This would mark the metric for both the scenarios and we would have false/non-actionable alerts in such cases
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
This PR does 2 things:
Tests
Only Adding New Metrics and Logging.
Commits