[GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables#4032
[GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables#4032pratapaditya04 wants to merge 7 commits intoapache:masterfrom
Conversation
e08ac21 to
719052f
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4032 +/- ##
============================================
- Coverage 45.86% 38.80% -7.06%
+ Complexity 3257 1599 -1658
============================================
Files 707 388 -319
Lines 27865 15995 -11870
Branches 2796 1585 -1211
============================================
- Hits 12779 6207 -6572
+ Misses 14008 9290 -4718
+ Partials 1078 498 -580 ☔ View full report in Codecov by Sentry. |
…nto failed_dag_store_table_merger # Conflicts: # gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java # gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java # gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
| // Map to maintain parent to children mapping. | ||
| private Map<DagNode, List<DagNode<T>>> parentChildMap; | ||
| private List<DagNode<T>> nodes; | ||
| @Setter |
There was a problem hiding this comment.
because we do not persist dag level field in mysql, adding fields to Dag will not be much useful and may lead to bugs
There was a problem hiding this comment.
We want to have this field here as we didn't want to add additional parameters in all the methods to pass on is_failed value.
...rc/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
Show resolved
Hide resolved
...in/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
Outdated
Show resolved
Hide resolved
|
Should before completing the review, i would like to understand your thoughts on this. |
c13315d to
7e3e556
Compare
If we don't keep it in mysql, we may lose it in case of restarts/deployments, so we will have to store in mysql |
...in/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
Show resolved
Hide resolved
phet
left a comment
There was a problem hiding this comment.
this will be a nice simplification. looks close
| /** | ||
| * This marks the dag as a failed one. | ||
| * Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried. | ||
| * Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried. |
There was a problem hiding this comment.
does it remain useful to both retrieve the DAG while also asserting that it's failed?
...rc/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
Show resolved
Hide resolved
| private synchronized void start() { | ||
| if (!dagStoresInitialized) { | ||
| this.dagStateStore = createDagStateStore(config, topologySpecMap); | ||
| this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), |
There was a problem hiding this comment.
any ideas on handling migration when we roll this out (presuming the failed DagStateStore was not empty)?
| public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) | ||
| throws IOException { | ||
| this.dagStateStore.updateDagNode(dagId, dagNode); | ||
| this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed. |
There was a problem hiding this comment.
nit: space before starting a comment. also more brevity; e.g.:
// create all DagNodes as isFailedDag == false
| PreparedStatement createStatement = connection.prepareStatement( | ||
| String.format(CREATE_TABLE_STATEMENT, tableName))) { |
There was a problem hiding this comment.
given arjun just wrote this class a month or two back, please ensure your auto-formatting is what it's supposed to be. it is possible his was off, but let's check. sure we might fix spelling errors, but there should be little reason to reformat files we've only just created
| public List<Dag<JobExecutionPlan>> getDags() throws IOException { | ||
| throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " | ||
| + "the DagManager that is replaced by DagProcessingEngine"); } | ||
| + "the DagManager that is replaced by DagProcessingEngine");} |
There was a problem hiding this comment.
actually, doesn't this need a newline before }?
| protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) | ||
| throws IOException { | ||
| return dagManagementStateStore.getFailedDag(getDagId()); | ||
| return dagManagementStateStore.getDag(getDagId()); |
There was a problem hiding this comment.
shall we verify the one returned is actually failed?
| private List<Boolean> fetchDagNodeStates(String dagId) throws IOException { | ||
| List<Boolean> states = new ArrayList<>(); | ||
|
|
||
| dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { |
There was a problem hiding this comment.
is behind-the-scenes DB access the only way to validate behavior here? is there no way to access from the "official" DagStateStore, then mark failed and finally re-access from the DSS to verify all nodes have changed?
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Right now we are maintaining two tables to maintain DagState and Failed Dag State, In this PR ,we have tried to merge FailedDagState tables into DagState by adding a column is_failed_dag in DagState
Tests
Commits