Skip to content

Commit 0c8f0d7

Browse files
张文领zhangwl9
authored andcommitted
Saving cleanup info in table_process
1 parent 4677f14 commit 0c8f0d7

File tree

3 files changed

+219
-17
lines changed

3 files changed

+219
-17
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java

Lines changed: 139 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,24 @@
2424
import org.apache.amoro.ServerTableIdentifier;
2525
import org.apache.amoro.TableRuntime;
2626
import org.apache.amoro.config.TableConfiguration;
27+
import org.apache.amoro.process.ProcessStatus;
2728
import org.apache.amoro.server.optimizing.OptimizingStatus;
29+
import org.apache.amoro.server.persistence.PersistentBase;
30+
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
31+
import org.apache.amoro.server.process.TableProcessMeta;
2832
import org.apache.amoro.server.table.DefaultTableRuntime;
2933
import org.apache.amoro.server.table.RuntimeHandlerChain;
3034
import org.apache.amoro.server.table.TableService;
3135
import org.apache.amoro.server.table.cleanup.CleanupOperation;
36+
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
37+
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
3238
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
3339
import org.apache.commons.lang3.StringUtils;
3440
import org.slf4j.Logger;
3541
import org.slf4j.LoggerFactory;
3642

3743
import java.util.Collections;
44+
import java.util.HashMap;
3845
import java.util.HashSet;
3946
import java.util.List;
4047
import java.util.Locale;
@@ -48,6 +55,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain {
4855
protected final Logger logger = LoggerFactory.getLogger(getClass());
4956

5057
private static final long START_DELAY = 10 * 1000L;
58+
private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
59+
private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
60+
private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
61+
62+
private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator();
63+
private final PersistencyHelper persistencyHelper = new PersistencyHelper();
5164

5265
protected final Set<ServerTableIdentifier> scheduledTables =
5366
Collections.synchronizedSet(new HashSet<>());
@@ -123,14 +136,22 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
123136
}
124137

125138
private void executeTask(TableRuntime tableRuntime) {
139+
TableProcessMeta cleanProcessMeta = null;
140+
CleanupOperation cleanupOperation = null;
141+
Throwable executionError = null;
142+
126143
try {
127144
if (isExecutable(tableRuntime)) {
145+
cleanupOperation = getCleanupOperation();
146+
cleanProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation);
147+
128148
execute(tableRuntime);
129-
// Different tables take different amounts of time to execute the end of execute(),
130-
// so you need to perform the update operation separately for each table.
131-
persistUpdatingCleanupTime(tableRuntime);
132149
}
150+
} catch (Throwable t) {
151+
executionError = t;
133152
} finally {
153+
persistCleanupResult(tableRuntime, cleanupOperation, cleanProcessMeta, executionError);
154+
134155
scheduledTables.remove(tableRuntime.getTableIdentifier());
135156
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
136157
}
@@ -154,25 +175,126 @@ protected boolean shouldExecute(Long lastCleanupEndTime) {
154175
return true;
155176
}
156177

157-
private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
158-
CleanupOperation cleanupOperation = getCleanupOperation();
178+
@VisibleForTesting
179+
public TableProcessMeta createCleanupProcessInfo(
180+
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
181+
159182
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
183+
return null;
184+
}
185+
186+
TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation);
187+
persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);
188+
189+
logger.debug(
190+
"Successfully persist cleanup process [processId={}, tableId={}, processType={}]",
191+
cleanProcessMeta.getProcessId(),
192+
cleanProcessMeta.getTableId(),
193+
cleanProcessMeta.getProcessType());
194+
195+
return cleanProcessMeta;
196+
}
197+
198+
@VisibleForTesting
199+
public void persistCleanupResult(
200+
TableRuntime tableRuntime,
201+
CleanupOperation cleanupOperation,
202+
TableProcessMeta cleanProcessMeta,
203+
Throwable executionError) {
204+
205+
if (cleanProcessMeta == null) {
160206
return;
161207
}
162208

163-
try {
164-
long currentTime = System.currentTimeMillis();
165-
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
209+
if (executionError != null) {
210+
cleanProcessMeta.setStatus(ProcessStatus.FAILED);
211+
String message = executionError.getMessage();
212+
if (message == null) {
213+
message = executionError.getClass().getName();
214+
}
166215

167-
logger.debug(
168-
"Update lastCleanTime for table {} with cleanup operation {}",
169-
tableRuntime.getTableIdentifier().getTableName(),
170-
cleanupOperation);
171-
} catch (Exception e) {
172-
logger.error(
173-
"Failed to update lastCleanTime for table {}",
174-
tableRuntime.getTableIdentifier().getTableName(),
175-
e);
216+
cleanProcessMeta.setFailMessage(message);
217+
} else {
218+
cleanProcessMeta.setStatus(ProcessStatus.SUCCESS);
219+
}
220+
221+
long endTime = System.currentTimeMillis();
222+
persistencyHelper.persistAndSetCompleted(
223+
tableRuntime, cleanupOperation, cleanProcessMeta, endTime);
224+
225+
logger.debug(
226+
"Successfully updated lastCleanTime and cleanupProcess for table {} with cleanup operation {}",
227+
tableRuntime.getTableIdentifier().getTableName(),
228+
cleanupOperation);
229+
}
230+
231+
private TableProcessMeta buildProcessMeta(
232+
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
233+
234+
TableProcessMeta cleanProcessMeta = new TableProcessMeta();
235+
cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
236+
cleanProcessMeta.setProcessId(idGenerator.generateId());
237+
cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
238+
cleanProcessMeta.setStatus(ProcessStatus.RUNNING);
239+
cleanProcessMeta.setProcessType(cleanupOperation.name());
240+
cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
241+
cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
242+
cleanProcessMeta.setRetryNumber(0);
243+
cleanProcessMeta.setCreateTime(System.currentTimeMillis());
244+
cleanProcessMeta.setProcessParameters(new HashMap<>());
245+
cleanProcessMeta.setSummary(new HashMap<>());
246+
247+
return cleanProcessMeta;
248+
}
249+
250+
private static class PersistencyHelper extends PersistentBase {
251+
252+
public PersistencyHelper() {}
253+
254+
private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
255+
doAsTransaction(
256+
() ->
257+
doAs(
258+
TableProcessMapper.class,
259+
mapper ->
260+
mapper.insertProcess(
261+
meta.getTableId(),
262+
meta.getProcessId(),
263+
meta.getExternalProcessIdentifier(),
264+
meta.getStatus(),
265+
meta.getProcessType(),
266+
meta.getProcessStage(),
267+
meta.getExecutionEngine(),
268+
meta.getRetryNumber(),
269+
meta.getCreateTime(),
270+
meta.getProcessParameters(),
271+
meta.getSummary())));
272+
}
273+
274+
private void persistAndSetCompleted(
275+
TableRuntime tableRuntime,
276+
CleanupOperation cleanupOperation,
277+
TableProcessMeta meta,
278+
long endTime) {
279+
280+
doAsTransaction(
281+
() ->
282+
doAs(
283+
TableProcessMapper.class,
284+
mapper ->
285+
mapper.updateProcess(
286+
meta.getTableId(),
287+
meta.getProcessId(),
288+
meta.getExternalProcessIdentifier(),
289+
meta.getStatus(),
290+
meta.getProcessStage(),
291+
meta.getRetryNumber(),
292+
endTime,
293+
meta.getFailMessage(),
294+
meta.getProcessParameters(),
295+
meta.getSummary())),
296+
() ->
297+
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime));
176298
}
177299
}
178300

amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.amoro.server.scheduler.inline;
2020

2121
import org.apache.amoro.TableRuntime;
22+
import org.apache.amoro.server.process.TableProcessMeta;
2223
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
2324
import org.apache.amoro.server.table.TableService;
2425
import org.apache.amoro.server.table.cleanup.CleanupOperation;
@@ -88,4 +89,17 @@ public boolean shouldExecuteTaskForTest(
8889
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
8990
return shouldExecuteTask(tableRuntime, cleanupOperation);
9091
}
92+
93+
public TableProcessMeta createCleanupProcessInfoForTest(
94+
TableRuntime tableRuntime, CleanupOperation operation) {
95+
return createCleanupProcessInfo(tableRuntime, operation);
96+
}
97+
98+
public void persistCleanupResultForTest(
99+
TableRuntime tableRuntime,
100+
CleanupOperation operation,
101+
TableProcessMeta meta,
102+
Throwable error) {
103+
persistCleanupResult(tableRuntime, operation, meta, error);
104+
}
91105
}

amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
import org.apache.amoro.TableFormat;
2323
import org.apache.amoro.TableRuntime;
2424
import org.apache.amoro.config.TableConfiguration;
25+
import org.apache.amoro.process.ProcessStatus;
2526
import org.apache.amoro.server.persistence.PersistentBase;
2627
import org.apache.amoro.server.persistence.TableRuntimeMeta;
2728
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
29+
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
2830
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
31+
import org.apache.amoro.server.process.TableProcessMeta;
2932
import org.apache.amoro.server.table.DefaultTableRuntime;
3033
import org.apache.amoro.server.table.DefaultTableRuntimeStore;
3134
import org.apache.amoro.server.table.TableRuntimeHandler;
@@ -262,4 +265,67 @@ public void testShouldExecuteTaskWithNoneOperation() {
262265
boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE);
263266
Assert.assertTrue("Should always execute with NONE operation", shouldExecute);
264267
}
268+
269+
/**
270+
* Validates that process info is correctly saved to table_process with proper status transitions.
271+
*/
272+
@Test
273+
public void testCleanupProcessPersistence() {
274+
long baseTableId = 200L;
275+
List<CleanupOperation> operations =
276+
Arrays.asList(
277+
CleanupOperation.ORPHAN_FILES_CLEANING,
278+
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
279+
CleanupOperation.DATA_EXPIRING,
280+
CleanupOperation.SNAPSHOTS_EXPIRING);
281+
282+
for (int i = 0; i < operations.size(); i++) {
283+
long tableId = baseTableId + i;
284+
CleanupOperation operation = operations.get(i);
285+
prepareTestEnvironment(Collections.singletonList(tableId));
286+
287+
PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
288+
ServerTableIdentifier identifier = createTableIdentifier(tableId);
289+
DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
290+
291+
// Scenario 1: Create process - verify initial persisted state
292+
TableProcessMeta processMeta =
293+
executor.createCleanupProcessInfoForTest(tableRuntime, operation);
294+
TableProcessMeta persistedRunning = getProcessMeta(processMeta.getProcessId());
295+
296+
Assert.assertEquals(ProcessStatus.RUNNING, persistedRunning.getStatus());
297+
298+
// Scenario 2: Success completion - verify status update and timestamps
299+
executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), null);
300+
TableProcessMeta persistedSuccess = getProcessMeta(processMeta.getProcessId());
301+
Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus());
302+
Assert.assertTrue(persistedSuccess.getCreateTime() < persistedSuccess.getFinishTime());
303+
304+
// Scenario 3: Failure handling - verify error persistence
305+
processMeta = executor.createCleanupProcessInfoForTest(tableRuntime, operation);
306+
RuntimeException testError = new RuntimeException("Cleanup failed for " + operation);
307+
executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), testError);
308+
309+
TableProcessMeta persistedFailure = getProcessMeta(processMeta.getProcessId());
310+
Assert.assertEquals(ProcessStatus.FAILED, persistedFailure.getStatus());
311+
Assert.assertEquals(testError.getMessage(), persistedFailure.getFailMessage());
312+
313+
cleanUpTableProcess(tableId);
314+
}
315+
}
316+
317+
private TableProcessMeta getProcessMeta(long processId) {
318+
return getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
319+
}
320+
321+
/** Clean up table_process records for a specific table */
322+
private void cleanUpTableProcess(long tableId) {
323+
doAs(
324+
TableProcessMapper.class,
325+
mapper -> {
326+
mapper
327+
.listProcessMeta(tableId, null, null)
328+
.forEach(p -> mapper.deleteBefore(tableId, p.getProcessId()));
329+
});
330+
}
265331
}

0 commit comments

Comments
 (0)