Skip to content

Commit fc59521

Browse files
committed
Iceberg: V3: Add support for Row Lineage
1 parent 6ff00f3 commit fc59521

28 files changed

+629
-22
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
2727
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
2828
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
29+
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
2930
import org.apache.hadoop.io.NullWritable;
3031
import org.apache.hadoop.mapred.JobConf;
3132
import org.apache.hadoop.mapred.OutputFormat;
@@ -64,10 +65,12 @@ private static HiveIcebergWriter writer(JobConf jc) {
6465
// It gets the config from the FileSinkOperator which has its own config for every target table
6566
Table table = HiveTableUtil.deserializeTable(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
6667
setWriterLevelConfiguration(jc, table);
68+
boolean shouldAddRowLineageColumns = jc.getBoolean(SessionStateUtil.ROW_LINEAGE, false);
6769

6870
return WriterBuilder.builderFor(table, jc::get)
6971
.queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname))
7072
.attemptID(taskAttemptID)
73+
.addRowLineageColumns(shouldAddRowLineageColumns)
7174
.build();
7275
}
7376

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
import org.apache.hadoop.hive.conf.HiveConf;
3030
import org.apache.hadoop.hive.ql.Context;
3131
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
32+
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
3233
import org.apache.hadoop.hive.serde2.AbstractSerDe;
3334
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
3435
import org.apache.hadoop.hive.serde2.SerDeException;
3536
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
3637
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
3738
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
3839
import org.apache.hadoop.io.Writable;
40+
import org.apache.iceberg.MetadataColumns;
3941
import org.apache.iceberg.PartitionField;
4042
import org.apache.iceberg.PartitionSpec;
4143
import org.apache.iceberg.PartitionSpecParser;
@@ -183,6 +185,10 @@ private static Schema projectedSchema(Configuration conf, String tableName, Sche
183185
case UPDATE:
184186
return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
185187
case OTHER:
188+
boolean rowLineage = Boolean.parseBoolean(conf.get(SessionStateUtil.ROW_LINEAGE));
189+
if (rowLineage) {
190+
return MetadataColumns.schemaWithRowLineage(tableSchema);
191+
}
186192
return tableSchema;
187193
default:
188194
throw new IllegalArgumentException("Unsupported operation " + operation);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class HiveIcebergStorageHandler extends DefaultStorageHandler implements
240240
public static final String COPY_ON_WRITE = RowLevelOperationMode.COPY_ON_WRITE.modeName();
241241
public static final String MERGE_ON_READ = RowLevelOperationMode.MERGE_ON_READ.modeName();
242242

243-
private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(
244-
PARTITION_SPEC_ID, PARTITION_HASH, FILE_PATH, ROW_POSITION, PARTITION_PROJECTION);
243+
private static final List<VirtualColumn> ACID_VIRTUAL_COLS =
244+
ImmutableList.of(PARTITION_SPEC_ID, PARTITION_HASH, FILE_PATH, ROW_POSITION, PARTITION_PROJECTION);
245245

246246
private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = schema(ACID_VIRTUAL_COLS);
247247

@@ -313,6 +313,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
313313
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
314314
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
315315
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
316+
SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE)
317+
.ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString()));
316318

317319
}
318320

@@ -1809,6 +1811,13 @@ public boolean areSnapshotsSupported() {
18091811
return true;
18101812
}
18111813

1814+
@Override
1815+
public boolean supportsRowLineage(Map<String, String> tableProperties) {
1816+
return Optional.ofNullable(tableProperties).filter(
1817+
properties -> IcebergTableUtil.formatVersion(properties) >= 3 &&
1818+
FileFormat.PARQUET == IcebergTableUtil.defaultFileFormat(properties::getOrDefault)).isPresent();
1819+
}
1820+
18121821
@Override
18131822
public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
18141823
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.hadoop.conf.Configuration;
2929
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
30+
import org.apache.hadoop.hive.ql.io.RowLineageInfo;
3031
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
3132
import org.apache.iceberg.ContentFile;
3233
import org.apache.iceberg.MetadataColumns;
@@ -39,6 +40,7 @@
3940
import org.apache.iceberg.data.Record;
4041
import org.apache.iceberg.deletes.PositionDelete;
4142
import org.apache.iceberg.io.CloseableIterator;
43+
import org.apache.iceberg.mr.mapreduce.RowLineageReader;
4244
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4345
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4446
import org.apache.iceberg.types.Types;
@@ -71,6 +73,9 @@ private IcebergAcidUtil() {
7173
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_HASH.getName(), PARTITION_STRUCT_META_COL);
7274
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.FILE_PATH.getName(), MetadataColumns.FILE_PATH);
7375
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_POSITION.getName(), MetadataColumns.ROW_POSITION);
76+
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(),
77+
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
78+
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROWID.getName(), MetadataColumns.ROW_ID);
7479
}
7580

7681
private static final Types.NestedField PARTITION_HASH_META_COL = Types.NestedField.required(
@@ -219,11 +224,11 @@ public static class VirtualColumnAwareIterator<T> implements CloseableIterator<T
219224
private final GenericRecord current;
220225
private final Configuration conf;
221226

222-
public VirtualColumnAwareIterator(
223-
CloseableIterator<T> currentIterator, Schema expectedSchema, Configuration conf) {
227+
public VirtualColumnAwareIterator(CloseableIterator<T> currentIterator, Schema expectedSchema, Configuration conf) {
224228
this.currentIterator = currentIterator;
225-
this.current = GenericRecord.create(
226-
new Schema(expectedSchema.columns().subList(4, expectedSchema.columns().size())));
229+
Schema schema =
230+
new Schema(expectedSchema.columns().subList(FILE_READ_META_COLS.size(), expectedSchema.columns().size()));
231+
this.current = GenericRecord.create(schema);
227232
this.conf = conf;
228233
}
229234

@@ -248,6 +253,8 @@ public T next() {
248253
IcebergAcidUtil.parseFilePath(rec),
249254
IcebergAcidUtil.parseFilePosition(rec),
250255
StringUtils.EMPTY);
256+
RowLineageInfo.setRowLineageInfoIntoConf(RowLineageReader.readRowId(rec),
257+
RowLineageReader.readLastUpdatedSequenceNumber(rec), conf);
251258
return (T) current;
252259
}
253260
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
2929
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
3030
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
31+
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
3132
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
3233
import org.apache.hadoop.io.NullWritable;
3334
import org.apache.hadoop.mapred.JobConf;
@@ -61,6 +62,7 @@ public final class HiveBatchIterator implements CloseableIterator<HiveBatchConte
6162
this.partitionColIndices = partitionColIndices;
6263
this.partitionValues = partitionValues;
6364
this.idToConstant = idToConstant;
65+
RowLineageUtils.initializeRowLineageColumns(vrbCtx, batch);
6466
}
6567

6668
@Override
@@ -129,6 +131,51 @@ private void advance() {
129131
bcv.isNull[0] = true;
130132
bcv.isRepeating = true;
131133
break;
134+
case ROW_LINEAGE_ID:
135+
LongColumnVector rowIdLcv = (LongColumnVector) batch.cols[idx];
136+
Object firstRowIdObj = idToConstant.get(MetadataColumns.ROW_ID.fieldId());
137+
long firstRowId = (firstRowIdObj != null) ? (Long) firstRowIdObj : 0L;
138+
139+
// If vector[0] is still -1, the reader didn't find the column in the file.
140+
if (rowIdLcv.vector[0] == -1L) {
141+
for (int i = 0; i < batch.size; i++) {
142+
rowIdLcv.vector[i] = firstRowId + rowOffset + i;
143+
}
144+
} else {
145+
// Lineage data was found (could be 0). Preserve it and fill only the NULL gaps.
146+
for (int i = 0; i < batch.size; i++) {
147+
if (rowIdLcv.isNull[i]) {
148+
rowIdLcv.vector[i] = firstRowId + rowOffset + i;
149+
rowIdLcv.isNull[i] = false;
150+
}
151+
}
152+
}
153+
rowIdLcv.noNulls = true;
154+
rowIdLcv.isRepeating = false;
155+
break;
156+
157+
case LAST_UPDATED_SEQUENCE_NUMBER:
158+
LongColumnVector lusnLcv = (LongColumnVector) batch.cols[idx];
159+
Object fileSeqObj = idToConstant.get(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
160+
long fileSeq = (fileSeqObj != null) ? (Long) fileSeqObj : 0L;
161+
162+
// If vector[0] is still -1, apply the file-level sequence number to the whole batch.
163+
if (lusnLcv.vector[0] == -1L) {
164+
for (int i = 0; i < batch.size; i++) {
165+
lusnLcv.vector[i] = fileSeq;
166+
}
167+
} else {
168+
// Lineage data found in file, fill only the gaps where data is missing.
169+
for (int i = 0; i < batch.size; i++) {
170+
if (!lusnLcv.noNulls && lusnLcv.isNull[i]) {
171+
lusnLcv.vector[i] = fileSeq;
172+
lusnLcv.isNull[i] = false;
173+
}
174+
}
175+
}
176+
lusnLcv.noNulls = true;
177+
lusnLcv.isRepeating = false;
178+
break;
132179
}
133180
}
134181
} catch (IOException ioe) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.iceberg.FileContent;
4242
import org.apache.iceberg.FileFormat;
4343
import org.apache.iceberg.FileScanTask;
44+
import org.apache.iceberg.MetadataColumns;
4445
import org.apache.iceberg.ScanTask;
4546
import org.apache.iceberg.Snapshot;
4647
import org.apache.iceberg.Table;
@@ -79,6 +80,7 @@ public class WriterBuilder {
7980
// To specify whether to write the actual row data while writing the delete files.
8081
public static final String ICEBERG_DELETE_SKIPROWDATA = "iceberg.delete.skiprowdata";
8182
public static final boolean ICEBERG_DELETE_SKIPROWDATA_DEFAULT = true;
83+
private boolean shouldAddRowLineageColumns = false;
8284

8385
private WriterBuilder(Table table, UnaryOperator<String> ops) {
8486
this.table = table;
@@ -126,7 +128,7 @@ public HiveIcebergWriter build() {
126128

127129
HiveFileWriterFactory writerFactory = HiveFileWriterFactory.builderFor(table)
128130
.dataFileFormat(context.dataFileFormat())
129-
.dataSchema(table.schema())
131+
.dataSchema(shouldAddRowLineageColumns ? MetadataColumns.schemaWithRowLineage(table.schema()) : table.schema())
130132
.deleteFileFormat(context.deleteFileFormat())
131133
.positionDeleteRowSchema(context.skipRowData() || !context.inputOrdered() ?
132134
// SortingPositionOnlyDeleteWriter doesn't support rawData in delete schema
@@ -209,6 +211,11 @@ private static boolean shouldRewrite(DeleteFile deleteFile, boolean forDVs) {
209211
return ContentFileUtil.isFileScoped(deleteFile);
210212
}
211213

214+
public WriterBuilder addRowLineageColumns(boolean isRowLineage) {
215+
this.shouldAddRowLineageColumns = isRowLineage;
216+
return this;
217+
}
218+
212219
static class Context {
213220

214221
private final FileFormat dataFileFormat;

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.mapreduce.InputSplit;
2424
import org.apache.hadoop.mapreduce.RecordReader;
2525
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26+
import org.apache.iceberg.MetadataColumns;
2627
import org.apache.iceberg.Schema;
2728
import org.apache.iceberg.StructLike;
2829
import org.apache.iceberg.Table;
@@ -79,7 +80,8 @@ private static Schema readSchema(Configuration conf, Table table, boolean caseSe
7980
}
8081

8182
if (InputFormatConfig.fetchVirtualColumns(conf)) {
82-
return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table);
83+
return MetadataColumns.schemaWithRowLineage(
84+
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table));
8385
}
8486

8587
return readSchema;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.mapreduce;
21+
22+
import org.apache.iceberg.MetadataColumns;
23+
import org.apache.iceberg.Schema;
24+
import org.apache.iceberg.data.Record;
25+
26+
public final class RowLineageReader {
27+
28+
private RowLineageReader() {
29+
}
30+
31+
public static Long readRowId(Record rec) {
32+
return (Long) rec.getField(MetadataColumns.ROW_ID.name());
33+
}
34+
35+
public static Long readLastUpdatedSequenceNumber(Record rec) {
36+
return (Long) rec.getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
37+
}
38+
39+
public static boolean schemaHasRowLineageColumns(Schema schema) {
40+
return schema.findField(MetadataColumns.ROW_ID.name()) != null ||
41+
schema.findField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()) != null;
42+
}
43+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
create table ice_t (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3');
2+
insert into ice_t values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91);
3+
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;
4+
5+
update ice_t set balance = 500 where id = 2;
6+
7+
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;
8+
9+
-- merge
10+
CREATE TABLE ice_merge (
11+
id INT,
12+
data STRING
13+
)
14+
STORED BY iceberg
15+
TBLPROPERTIES ('format-version'='3');
16+
17+
INSERT INTO ice_merge VALUES
18+
(1, 'a'),
19+
(2, 'b'),
20+
(3, 'c');
21+
22+
CREATE TABLE src (
23+
id INT,
24+
data STRING
25+
)
26+
STORED AS TEXTFILE;
27+
28+
INSERT INTO src VALUES
29+
(2, 'bb'),
30+
(4, 'd');
31+
32+
MERGE INTO ice_merge t
33+
USING src s
34+
ON t.id = s.id
35+
WHEN MATCHED THEN
36+
UPDATE SET data = s.data
37+
WHEN NOT MATCHED THEN
38+
INSERT VALUES (s.id, s.data);
39+
40+
SELECT id, data, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
41+
FROM ice_merge
42+
ORDER BY ROW__LINEAGE__ID;

0 commit comments

Comments
 (0)