Skip to content

Commit 66e95ce

Browse files
authored
Evolve iceberg table schema for partition copy (#4142)
1 parent 179baac commit 66e95ce

File tree

8 files changed

+258
-121
lines changed

8 files changed

+258
-121
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ExecutionException;
2727

2828
import org.apache.iceberg.DataFile;
29+
import org.apache.iceberg.Schema;
2930
import org.apache.iceberg.catalog.TableIdentifier;
3031

3132
import com.github.rholder.retry.Attempt;
@@ -57,13 +58,23 @@
5758
@Slf4j
5859
public class IcebergOverwritePartitionsStep implements CommitStep {
5960
private final String destTableIdStr;
61+
/**
62+
* Updated schema must be enforced to be backwards compatible by the catalog in
63+
* the partition overwrite step otherwise previous partitions may become unreadable.
64+
* This schema is determined at the start of the copy job before data files discovery and the same is
65+
* committed before overwriting partitions. It is the responsibility of the catalog to ensure that
66+
* no conflicting commits happen in between the copy job.
67+
*/
68+
private final Schema updatedSchema;
6069
private final Properties properties;
6170
// data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file
6271
@Setter private List<DataFile> dataFiles;
6372
private final String partitionColName;
6473
private final String partitionValue;
6574
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
6675
".catalog.overwrite.partitions.retries";
76+
public static final String SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
77+
".schema.update.retries";
6778
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
6879
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
6980
RETRY_TIMES, 3,
@@ -75,8 +86,9 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
7586
* @param destTableIdStr the identifier of the destination table as a string
7687
* @param properties the properties containing configuration
7788
*/
78-
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) {
89+
public IcebergOverwritePartitionsStep(String destTableIdStr, Schema updatedSchema, String partitionColName, String partitionValue, Properties properties) {
7990
this.destTableIdStr = destTableIdStr;
91+
this.updatedSchema = updatedSchema;
8092
this.partitionColName = partitionColName;
8193
this.partitionValue = partitionValue;
8294
this.properties = properties;
@@ -108,6 +120,13 @@ public void execute() throws IOException {
108120
dataFiles.size(),
109121
dataFiles.get(0).path()
110122
);
123+
// update schema
124+
Retryer<Void> schemaUpdateRetryer = createSchemaUpdateRetryer();
125+
schemaUpdateRetryer.call(() -> {
126+
destTable.updateSchema(this.updatedSchema, false);
127+
return null;
128+
});
129+
// overwrite partitions
111130
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
112131
overwritePartitionsRetryer.call(() -> {
113132
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
@@ -146,17 +165,30 @@ private Retryer<Void> createOverwritePartitionsRetryer() {
146165
? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
147166
: ConfigFactory.empty();
148167

168+
return getRetryer(retryerOverridesConfig);
169+
}
170+
171+
private Retryer<Void> createSchemaUpdateRetryer() {
172+
Config config = ConfigFactory.parseProperties(this.properties);
173+
Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
174+
? config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
175+
: ConfigFactory.empty();
176+
177+
return getRetryer(retryerOverridesConfig);
178+
}
179+
180+
private Retryer<Void> getRetryer(Config retryerOverridesConfig) {
149181
return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
150-
@Override
151-
public <V> void onRetry(Attempt<V> attempt) {
152-
if (attempt.hasException()) {
153-
String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]",
154-
destTableIdStr,
155-
attempt.getAttemptNumber(),
156-
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
157-
log.warn(msg, attempt.getExceptionCause());
158-
}
159-
}
160-
}));
182+
@Override
183+
public <V> void onRetry(Attempt<V> attempt) {
184+
if (attempt.hasException()) {
185+
String msg = String.format("~%s~ Exception occurred [attempt: %d; elapsed: %s]",
186+
destTableIdStr,
187+
attempt.getAttemptNumber(),
188+
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
189+
log.warn(msg, attempt.getExceptionCause());
190+
}
191+
}
192+
}));
161193
}
162194
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iceberg.DataFile;
3838
import org.apache.iceberg.DataFiles;
3939
import org.apache.iceberg.PartitionSpec;
40+
import org.apache.iceberg.Schema;
4041
import org.apache.iceberg.StructLike;
4142
import org.apache.iceberg.TableMetadata;
4243
import org.apache.iceberg.TableProperties;
@@ -98,6 +99,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
9899
// Differences are getting data files, copying ancestor permission and adding post publish steps
99100
String fileSet = this.getFileSetId();
100101
IcebergTable srcIcebergTable = getSrcIcebergTable();
102+
Schema srcTableSchema = srcIcebergTable.accessTableMetadata().schema();
101103
List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
102104

103105
if (srcDataFiles.isEmpty()) {
@@ -128,7 +130,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
128130
List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, copyConfig);
129131
// Adding this check to avoid adding post publish step when there are no files to copy.
130132
if (CollectionUtils.isNotEmpty(copyEntities)) {
131-
copyEntities.add(createOverwritePostPublishStep());
133+
copyEntities.add(createOverwritePostPublishStep(srcTableSchema));
132134
}
133135

134136
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
@@ -191,9 +193,16 @@ private Path addUUIDToPath(String filePathStr) {
191193
return new Path(fileDir, newFileName);
192194
}
193195

194-
private PostPublishStep createOverwritePostPublishStep() {
196+
/**
197+
* Creates a {@link PostPublishStep} for overwriting partitions in the destination Iceberg table.
198+
* @param srcTableSchema Schema of the source Iceberg table which needs to be passed to the
199+
* overwrite step for updating destination table schema.
200+
* @return a {@link PostPublishStep} that performs the partition overwrite operation.
201+
*/
202+
private PostPublishStep createOverwritePostPublishStep(Schema srcTableSchema) {
195203
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
196204
this.getDestIcebergTable().getTableId().toString(),
205+
srcTableSchema,
197206
this.partitionColumnName,
198207
this.partitionColValue,
199208
this.properties

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, Ice
5353
boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
5454
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
5555

56+
// This method only validates if the schema can be updated, no commit is performed here
57+
destIcebergTable.updateSchema(srcIcebergTable.accessTableMetadata().schema(), true);
58+
5659
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
5760
srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
5861

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iceberg.ManifestFiles;
3737
import org.apache.iceberg.ManifestReader;
3838
import org.apache.iceberg.OverwriteFiles;
39+
import org.apache.iceberg.Schema;
3940
import org.apache.iceberg.Snapshot;
4041
import org.apache.iceberg.StructLike;
4142
import org.apache.iceberg.Table;
@@ -48,6 +49,7 @@
4849
import org.apache.iceberg.io.FileIO;
4950

5051
import com.google.common.annotations.VisibleForTesting;
52+
import com.google.common.base.Preconditions;
5153
import com.google.common.collect.Iterators;
5254
import com.google.common.collect.Lists;
5355
import com.google.common.collect.Sets;
@@ -315,4 +317,31 @@ protected void overwritePartition(List<DataFile> dataFiles, String partitionColN
315317
log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId());
316318
}
317319

320+
/**
321+
* update table's schema to the provided {@link Schema}
322+
* @param updatedSchema the updated schema to be set on the table.
323+
* @param onlyValidate if true, only validates if the schema is can be updated without committing.
324+
* @throws TableNotFoundException if the table does not exist.
325+
*/
326+
protected void updateSchema(Schema updatedSchema, boolean onlyValidate) throws TableNotFoundException {
327+
TableMetadata currentTableMetadata = accessTableMetadata();
328+
Schema currentSchema = currentTableMetadata.schema();
329+
330+
if (currentSchema.sameSchema(updatedSchema)) {
331+
log.info("~{}~ schema is already up-to-date", tableId);
332+
return;
333+
}
334+
335+
log.info("~{}~ updating schema from {} to {}, commit: {}", tableId, currentSchema, updatedSchema, !onlyValidate);
336+
337+
TableMetadata updatedTableMetadata = currentTableMetadata.updateSchema(updatedSchema, updatedSchema.highestFieldId());
338+
Preconditions.checkArgument(updatedTableMetadata.schema().sameSchema(updatedSchema), "Schema mismatch after update, please check destination table");
339+
340+
if (!onlyValidate) {
341+
tableOps.commit(currentTableMetadata, updatedTableMetadata);
342+
tableOps.refresh();
343+
log.info("~{}~ schema updated successfully", tableId);
344+
}
345+
}
346+
318347
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.IOException;
2121

2222
import org.apache.iceberg.PartitionSpec;
23-
import org.apache.iceberg.Schema;
2423
import org.apache.iceberg.TableMetadata;
2524

2625
import lombok.extern.slf4j.Slf4j;
@@ -36,11 +35,7 @@ private IcebergTableMetadataValidatorUtils() {
3635
}
3736

3837
/**
39-
* Compares the metadata of the given two iceberg tables.
40-
* <ul>
41-
* <li>First compares the schema of the metadata.</li>
42-
* <li>Then compares the partition spec of the metadata.</li>
43-
* </ul>
38+
* Compares the partition spec of the metadata.
4439
* @param tableMetadataA the metadata of the first table
4540
* @param tableMetadataB the metadata of the second table
4641
* @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
@@ -52,27 +47,6 @@ public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA,
5247
tableMetadataA.metadataFileLocation(),
5348
tableMetadataB.metadataFileLocation());
5449

55-
Schema schemaA = tableMetadataA.schema();
56-
Schema schemaB = tableMetadataB.schema();
57-
// TODO: Need to add support for schema evolution
58-
// This check needs to be broken down into multiple checks to support schema evolution
59-
// Possible cases - schemaA == schemaB,
60-
// - schemaA is subset of schemaB [ schemaB Evolved ],
61-
// - schemaA is superset of schemaB [ schemaA Evolved ],
62-
// - Other cases?
63-
// Also consider using Strategy or any other design pattern for this to make it a better solution
64-
if (!schemaA.sameSchema(schemaB)) {
65-
String errMsg = String.format(
66-
"Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
67-
tableMetadataA.metadataFileLocation(),
68-
schemaA.schemaId(),
69-
tableMetadataB.metadataFileLocation(),
70-
schemaB.schemaId()
71-
);
72-
log.error(errMsg);
73-
throw new IOException(errMsg);
74-
}
75-
7650
PartitionSpec partitionSpecA = tableMetadataA.spec();
7751
PartitionSpec partitionSpecB = tableMetadataB.spec();
7852
// .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iceberg.DataFile;
2525
import org.apache.iceberg.DataFiles;
2626
import org.apache.iceberg.PartitionSpec;
27+
import org.apache.iceberg.Schema;
2728
import org.apache.iceberg.catalog.TableIdentifier;
2829
import org.mockito.Mockito;
2930
import org.testng.Assert;
@@ -40,17 +41,19 @@ public class IcebergOverwritePartitionsStepTest {
4041
private final String testPartitionColName = "testPartition";
4142
private final String testPartitionColValue = "testValue";
4243
private IcebergTable mockIcebergTable;
44+
private Schema mockSchema;
4345
private IcebergCatalog mockIcebergCatalog;
4446
private Properties mockProperties;
4547
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
4648

4749
@BeforeMethod
4850
public void setUp() throws IOException {
4951
mockIcebergTable = Mockito.mock(IcebergTable.class);
52+
mockSchema = Mockito.mock(Schema.class);
5053
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
5154
mockProperties = new Properties();
5255

53-
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
56+
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
5457
testPartitionColName, testPartitionColValue, mockProperties));
5558

5659
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
@@ -112,7 +115,7 @@ public void testExecuteWithCustomRetryConfig() throws IOException {
112115
int retryCount = 7;
113116
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES,
114117
Integer.toString(retryCount));
115-
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
118+
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
116119
testPartitionColName, testPartitionColValue, mockProperties));
117120

118121
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,13 @@ public class IcebergTableMetadataValidatorUtilsTest {
3535
.requiredString("field1")
3636
.requiredString("field2")
3737
.endRecord());
38-
private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
39-
.fields()
40-
.requiredString("field2")
41-
.requiredString("field1")
42-
.endRecord());
43-
private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
44-
.fields()
45-
.requiredString("field1")
46-
.requiredString("field2")
47-
.requiredInt("field3")
48-
.endRecord());
49-
private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
50-
.fields()
51-
.requiredInt("field1")
52-
.requiredString("field2")
53-
.requiredInt("field3")
54-
.endRecord());
5538
private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
5639
.identity("field1")
5740
.build();
5841
private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata(
5942
schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
6043
private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
6144
schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
62-
private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
63-
schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
64-
private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
6545
private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
6646
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
6747
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
@@ -74,64 +54,6 @@ public void testValidateSameSchema() throws IOException {
7454
Assert.assertTrue(true);
7555
}
7656

77-
@Test
78-
public void testValidateDifferentSchemaFails() {
79-
// Schema 1 and Schema 2 have different field order
80-
81-
TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
82-
unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
83-
84-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
85-
tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
86-
}
87-
88-
@Test
89-
public void testValidateSchemaWithDifferentTypesFails() {
90-
// schema 3 and schema 4 have different field types for field1
91-
92-
TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
93-
unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
94-
95-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
96-
tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
97-
}
98-
99-
@Test
100-
public void testValidateSchemaWithEvolvedSchemaIFails() {
101-
// Schema 3 has one more extra field as compared to Schema 1
102-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
103-
tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
104-
}
105-
106-
@Test
107-
public void testValidateSchemaWithEvolvedSchemaIIFails() {
108-
// TODO: This test should pass in the future when we support schema evolution
109-
// Schema 3 has one more extra field as compared to Schema 1
110-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
111-
tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
112-
}
113-
114-
@Test
115-
public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
116-
// Adding this test as to verify that partition copy doesn't proceed further for this case
117-
// as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
118-
// specially from int to long
119-
Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
120-
.fields()
121-
.requiredLong("field1")
122-
.requiredString("field2")
123-
.requiredInt("field3")
124-
.endRecord());
125-
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
126-
.identity("field1")
127-
.build();
128-
TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
129-
partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());
130-
131-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
132-
tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
133-
}
134-
13557
@Test
13658
public void testValidateSamePartitionSpec() throws IOException {
13759
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(

0 commit comments

Comments
 (0)