Skip to content

Commit 40120af

Browse files
author
majin.nathan
committed
[Refactor] Introduce Iceberg TableRuntimePlugin
1 parent 75f3a60 commit 40120af

File tree

14 files changed

+342
-234
lines changed

14 files changed

+342
-234
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@
5454
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
5555
import org.apache.amoro.server.table.DefaultTableManager;
5656
import org.apache.amoro.server.table.DefaultTableService;
57-
import org.apache.amoro.server.table.RuntimeHandlerChain;
57+
import org.apache.amoro.server.table.IcebergTablePlugin;
5858
import org.apache.amoro.server.table.TableManager;
5959
import org.apache.amoro.server.table.TableRuntimeFactoryManager;
60+
import org.apache.amoro.server.table.TableRuntimePlugin;
6061
import org.apache.amoro.server.table.TableService;
6162
import org.apache.amoro.server.terminal.TerminalManager;
6263
import org.apache.amoro.server.utils.ThriftServiceProxy;
@@ -198,12 +199,11 @@ public void waitFollowerShip() throws Exception {
198199
haContainer.waitFollowerShip();
199200
}
200201

201-
public void startRestServices() throws Exception {
202+
public void startRestServices() {
202203
EventsManager.getInstance();
203204
MetricManager.getInstance();
204205

205206
catalogManager = new DefaultCatalogManager(serviceConfig);
206-
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
207207
optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager);
208208
terminalManager = new TerminalManager(serviceConfig, catalogManager);
209209

@@ -234,38 +234,37 @@ public void startOptimizingService() throws Exception {
234234

235235
tableService =
236236
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
237-
237+
tableManager = new DefaultTableManager(serviceConfig, tableService);
238238
optimizingService =
239239
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
240240

241241
processService = new ProcessService(serviceConfig, tableService);
242-
243-
LOG.info("Setting up AMS table executors...");
244-
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
245-
addHandlerChain(optimizingService.getTableRuntimeHandler());
246-
addHandlerChain(processService.getTableHandlerChain());
247-
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
248-
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
249-
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
250-
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
251-
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
252-
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor());
253-
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
254-
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
255-
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
256-
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
257-
tableService.initialize();
242+
tableService.initialize(initTablePlugins());
258243
LOG.info("AMS table service have been initialized");
259-
tableManager.setTableService(tableService);
260244

261245
initThriftService();
262246
startThriftService();
263247
}
264248

265-
private void addHandlerChain(RuntimeHandlerChain chain) {
266-
if (chain != null) {
267-
tableService.addHandlerChain(chain);
268-
}
249+
private List<TableRuntimePlugin> initTablePlugins() {
250+
LOG.info("Setting up AMS table executors...");
251+
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
252+
IcebergTablePlugin icebergTablePlugin =
253+
IcebergTablePlugin.builder()
254+
.addHandler(optimizingService.getTableRuntimeHandler())
255+
.addHandler(processService.getTableHandlerChain())
256+
.addHandler(InlineTableExecutors.getInstance().getDataExpiringExecutor())
257+
.addHandler(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor())
258+
.addHandler(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor())
259+
.addHandler(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor())
260+
.addHandler(InlineTableExecutors.getInstance().getOptimizingCommitExecutor())
261+
.addHandler(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor())
262+
.addHandler(InlineTableExecutors.getInstance().getBlockerExpiringExecutor())
263+
.addHandler(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor())
264+
.addHandler(InlineTableExecutors.getInstance().getTableRefreshingExecutor())
265+
.addHandler(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor())
266+
.build();
267+
return List.of(icebergTablePlugin);
269268
}
270269

271270
public void disposeOptimizingService() {

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,11 @@
2626
import org.apache.amoro.api.Blocker;
2727
import org.apache.amoro.api.TableIdentifier;
2828
import org.apache.amoro.config.Configurations;
29-
import org.apache.amoro.exception.AlreadyExistsException;
3029
import org.apache.amoro.exception.BlockerConflictException;
31-
import org.apache.amoro.exception.IllegalMetadataException;
3230
import org.apache.amoro.exception.ObjectNotExistsException;
3331
import org.apache.amoro.exception.PersistenceException;
3432
import org.apache.amoro.server.AmoroManagementConf;
3533
import org.apache.amoro.server.AmoroServiceConstants;
36-
import org.apache.amoro.server.catalog.CatalogManager;
37-
import org.apache.amoro.server.catalog.InternalCatalog;
3834
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
3935
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
4036
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
@@ -54,7 +50,6 @@
5450
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
5551
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
5652
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
57-
import org.apache.commons.lang.StringUtils;
5853
import org.apache.commons.lang3.tuple.Pair;
5954
import org.slf4j.Logger;
6055
import org.slf4j.LoggerFactory;
@@ -74,18 +69,10 @@ public class DefaultTableManager extends PersistentBase implements TableManager
7469
public static final Logger LOG = LoggerFactory.getLogger(DefaultTableManager.class);
7570
private static final int TABLE_BLOCKER_RETRY = 3;
7671
private final long blockerTimeout;
72+
private final TableService tableService;
7773

78-
private final CatalogManager catalogManager;
79-
80-
private @Nullable TableService tableService;
81-
82-
public DefaultTableManager(Configurations configuration, CatalogManager catalogManager) {
83-
this.catalogManager = catalogManager;
74+
public DefaultTableManager(Configurations configuration, TableService tableService) {
8475
this.blockerTimeout = configuration.get(AmoroManagementConf.BLOCKER_TIMEOUT).toMillis();
85-
}
86-
87-
@Override
88-
public void setTableService(@Nullable TableService tableService) {
8976
this.tableService = tableService;
9077
}
9178

@@ -95,40 +82,12 @@ private Optional<TableService> tableService() {
9582

9683
@Override
9784
public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteData) {
98-
if (StringUtils.isBlank(tableIdentifier.getTableName())) {
99-
throw new IllegalMetadataException("table name is blank");
100-
}
101-
if (StringUtils.isBlank(tableIdentifier.getCatalog())) {
102-
throw new IllegalMetadataException("catalog is blank");
103-
}
104-
if (StringUtils.isBlank(tableIdentifier.getDatabase())) {
105-
throw new IllegalMetadataException("database is blank");
106-
}
107-
108-
InternalCatalog internalCatalog =
109-
catalogManager.getInternalCatalog(tableIdentifier.getCatalog());
110-
String database = tableIdentifier.getDatabase();
111-
String table = tableIdentifier.getTableName();
112-
if (!internalCatalog.tableExists(database, table)) {
113-
throw new ObjectNotExistsException(tableIdentifier);
114-
}
115-
116-
ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table);
117-
tableService().ifPresent(s -> s.onTableDropped(internalCatalog, serverTableIdentifier));
85+
tableService().ifPresent(s -> s.dropTable(tableIdentifier));
11886
}
11987

12088
@Override
12189
public void createTable(String catalogName, TableMetadata tableMetadata) {
122-
InternalCatalog catalog = catalogManager.getInternalCatalog(catalogName);
123-
String database = tableMetadata.getTableIdentifier().getDatabase();
124-
String table = tableMetadata.getTableIdentifier().getTableName();
125-
if (catalog.tableExists(database, table)) {
126-
throw new AlreadyExistsException(
127-
tableMetadata.getTableIdentifier().getIdentifier().buildTableIdentifier());
128-
}
129-
130-
TableMetadata metadata = catalog.createTable(tableMetadata);
131-
tableService().ifPresent(s -> s.onTableCreated(catalog, metadata.getTableIdentifier()));
90+
tableService().ifPresent(s -> s.createTable(tableMetadata));
13291
}
13392

13493
@Override

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public String name() {
4242
}
4343

4444
@Override
45-
public Optional<TableRuntimeCreator> accept(
45+
public Optional<Creator> accept(
4646
ServerTableIdentifier tableIdentifier, Map<String, String> tableProperties) {
4747
if (tableIdentifier
4848
.getFormat()
@@ -52,7 +52,7 @@ public Optional<TableRuntimeCreator> accept(
5252
return Optional.empty();
5353
}
5454

55-
private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator {
55+
private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.Creator {
5656
@Override
5757
public List<StateKey<?>> requiredStateKeys() {
5858
return DefaultTableRuntime.REQUIRED_STATES;

0 commit comments

Comments
 (0)