Skip to content

Commit bb38578

Browse files
committed
Dummy Iceberg table MSCK repair
1 parent e302ff6 commit bb38578

File tree

10 files changed

+551
-53
lines changed

10 files changed

+551
-53
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
7777
import org.apache.hadoop.hive.ql.ErrorMsg;
7878
import org.apache.hadoop.hive.ql.QueryState;
79+
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc;
80+
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult;
7981
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
8082
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
8183
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
@@ -185,6 +187,7 @@
185187
import org.apache.iceberg.mr.Catalogs;
186188
import org.apache.iceberg.mr.InputFormatConfig;
187189
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
190+
import org.apache.iceberg.mr.hive.actions.HiveIcebergRepairTable;
188191
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
189192
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
190193
import org.apache.iceberg.parquet.VariantUtil;
@@ -1141,7 +1144,7 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
11411144
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
11421145
int numThreads = conf.getInt(ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
11431146
ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
1144-
expireSnapshot(icebergTable, expireSnapshotsSpec, numThreads);
1147+
expireSnapshots(icebergTable, expireSnapshotsSpec, numThreads);
11451148
break;
11461149
case SET_CURRENT_SNAPSHOT:
11471150
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
@@ -1185,8 +1188,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
11851188
try {
11861189
if (numThreads > 0) {
11871190
LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads);
1188-
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(),
1189-
numThreads);
1191+
deleteExecutorService = IcebergTableUtil.newFixedThreadPool(
1192+
"delete-orphan-files" + icebergTable.name(), numThreads);
11901193
}
11911194

11921195
HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable);
@@ -1203,13 +1206,14 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
12031206
}
12041207
}
12051208

1206-
private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
1209+
private void expireSnapshots(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
12071210
int numThreads) {
12081211
ExecutorService deleteExecutorService = null;
12091212
try {
12101213
if (numThreads > 0) {
12111214
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
1212-
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads);
1215+
deleteExecutorService = IcebergTableUtil.newFixedThreadPool(
1216+
"expire-snapshots-" + icebergTable.name(), numThreads);
12131217
}
12141218
if (expireSnapshotsSpec == null) {
12151219
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
@@ -2282,6 +2286,24 @@ public boolean supportsDefaultColumnValues(Map<String, String> tblProps) {
22822286
return IcebergTableUtil.formatVersion(tblProps) >= 3;
22832287
}
22842288

2289+
@Override
2290+
public MsckResult repair(org.apache.hadoop.hive.ql.metadata.Table hmsTable, HiveConf conf, MsckDesc desc)
2291+
throws HiveException {
2292+
LOG.info("Starting Iceberg table repair{} for {}", desc.isDryRun() ? " (dry-run)" : "",
2293+
hmsTable.getFullyQualifiedName());
2294+
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
2295+
2296+
HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, desc);
2297+
try {
2298+
return repair.execute();
2299+
} catch (Exception e) {
2300+
String errorMsg = String.format("Failed to repair Iceberg table %s: %s",
2301+
hmsTable.getFullyQualifiedName(), e.getMessage());
2302+
LOG.error(errorMsg, e);
2303+
throw new HiveException(errorMsg, e);
2304+
}
2305+
}
2306+
22852307

22862308
private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
22872309
return exprs.stream().map(v ->

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import java.util.Set;
3333
import java.util.concurrent.ExecutorService;
3434
import java.util.concurrent.Executors;
35-
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.ThreadFactory;
3636
import java.util.function.BinaryOperator;
3737
import java.util.function.Function;
3838
import java.util.function.Predicate;
@@ -102,6 +102,7 @@
102102
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
103103
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
104104
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
105+
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
105106
import org.apache.iceberg.types.Conversions;
106107
import org.apache.iceberg.types.Type;
107108
import org.apache.iceberg.types.Types;
@@ -620,13 +621,12 @@ public static <T> List<T> readColStats(Table table, Long snapshotId, Predicate<B
620621
return colStats;
621622
}
622623

623-
public static ExecutorService newDeleteThreadPool(String completeName, int numThreads) {
624-
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
625-
return Executors.newFixedThreadPool(numThreads, runnable -> {
626-
Thread thread = new Thread(runnable);
627-
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
628-
return thread;
629-
});
624+
public static ExecutorService newFixedThreadPool(String threadName, int numThreads) {
625+
ThreadFactory threadFactory =
626+
new ThreadFactoryBuilder()
627+
.setNameFormat(threadName + "-%d")
628+
.build();
629+
return Executors.newFixedThreadPool(numThreads, threadFactory);
630630
}
631631

632632
public static boolean hasUndergonePartitionEvolution(Table table) {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.mr.hive.actions;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.stream.StreamSupport;
26+
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc;
27+
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult;
28+
import org.apache.iceberg.DeleteFiles;
29+
import org.apache.iceberg.FileScanTask;
30+
import org.apache.iceberg.Table;
31+
import org.apache.iceberg.Transaction;
32+
import org.apache.iceberg.io.CloseableIterable;
33+
import org.apache.iceberg.mr.hive.IcebergTableUtil;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
/**
38+
* Repairs an Iceberg table by removing dangling file references.
39+
* <p>
40+
* Detects and removes references to data files that are missing from the filesystem
41+
* but still referenced in metadata. Supports dry-run mode and parallel execution.
42+
*/
43+
public class HiveIcebergRepairTable {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRepairTable.class);
46+
private static final int DEFAULT_NUM_THREADS = 4;
47+
48+
private final Table table;
49+
private final MsckDesc desc;
50+
private final int numThreads;
51+
52+
public HiveIcebergRepairTable(Table table, MsckDesc desc) {
53+
this(table, desc, DEFAULT_NUM_THREADS);
54+
}
55+
56+
public HiveIcebergRepairTable(Table table, MsckDesc desc, int numThreads) {
57+
this.table = table;
58+
this.desc = desc;
59+
this.numThreads = numThreads;
60+
}
61+
62+
/**
63+
* Executes the repair operation within a provided transaction.
64+
*
65+
* @param transaction the Iceberg transaction to use
66+
* @return repair result containing number of issues fixed and log message
67+
* @throws IOException if metadata validation or file check fails
68+
*/
69+
public MsckResult execute(Transaction transaction) throws IOException {
70+
List<String> missingFiles = getMissingFiles();
71+
72+
if (missingFiles.isEmpty()) {
73+
String msg = "No missing files detected";
74+
LOG.info(msg);
75+
return new MsckResult(0, msg, new java.util.ArrayList<>());
76+
} else if (!desc.isDryRun()) {
77+
// Only commit changes if not in dry-run mode
78+
DeleteFiles deleteFiles = transaction.newDelete();
79+
for (String path : missingFiles) {
80+
deleteFiles.deleteFile(path);
81+
}
82+
deleteFiles.commit();
83+
}
84+
85+
String summaryMsg = desc.isDryRun() ?
86+
"Would remove %d dangling file reference(s)".formatted(missingFiles.size()) :
87+
"Removed %d dangling file reference(s)".formatted(missingFiles.size());
88+
LOG.info(summaryMsg);
89+
90+
String detailedMsg = desc.isDryRun() ?
91+
"Iceberg table repair (dry-run): %s".formatted(summaryMsg) :
92+
"Iceberg table repair completed: %s".formatted(summaryMsg);
93+
94+
return new MsckResult(missingFiles.size(), detailedMsg, missingFiles);
95+
}
96+
97+
/**
98+
* Executes the repair operation, automatically creating and committing a transaction.
99+
*
100+
* @return repair result containing removed files and statistics
101+
* @throws IOException if metadata validation or file check fails
102+
*/
103+
public MsckResult execute() throws IOException {
104+
Transaction transaction = table.newTransaction();
105+
MsckResult result = execute(transaction);
106+
if (!desc.isDryRun() && result.getNumIssues() > 0) {
107+
transaction.commitTransaction();
108+
}
109+
return result;
110+
}
111+
112+
/**
113+
* Finds all missing data files by checking their physical existence in parallel.
114+
*
115+
* @return list of file paths for missing data files
116+
* @throws IOException if the file check operation fails or is interrupted
117+
*/
118+
private List<String> getMissingFiles() throws IOException {
119+
try (ExecutorService executorService = IcebergTableUtil.newFixedThreadPool(
120+
"repair-table-" + table.name(), numThreads);
121+
CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
122+
return executorService.submit(() ->
123+
StreamSupport.stream(fileScanTasks.spliterator(), true)
124+
.map(task -> task.file().location())
125+
.filter(path -> !table.io().newInputFile(path).exists())
126+
.toList()
127+
).get();
128+
} catch (InterruptedException e) {
129+
Thread.currentThread().interrupt();
130+
throw new IOException("Interrupted while checking for missing files", e);
131+
132+
} catch (ExecutionException e) {
133+
throw new IOException("Failed to check for missing files: " + e.getMessage(), e);
134+
}
135+
}
136+
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void setConf(Configuration configuration) {
114114
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
115115
if (numThreads > 0) {
116116
LOG.info("Will expire Iceberg snapshots using an executor service with {} threads", numThreads);
117-
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool("iceberg-housekeeper-service", numThreads);
117+
deleteExecutorService = IcebergTableUtil.newFixedThreadPool("iceberg-housekeeper-service", numThreads);
118118
}
119119
}
120120
}

0 commit comments

Comments
 (0)