Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
Expand Down Expand Up @@ -64,10 +65,12 @@ private static HiveIcebergWriter writer(JobConf jc) {
// It gets the config from the FileSinkOperator which has its own config for every target table
Table table = HiveTableUtil.deserializeTable(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
setWriterLevelConfiguration(jc, table);
boolean shouldAddRowLineageColumns = jc.getBoolean(SessionStateUtil.ROW_LINEAGE, false);

return WriterBuilder.builderFor(table, jc::get)
.queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname))
.attemptID(taskAttemptID)
.addRowLineageColumns(shouldAddRowLineageColumns)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
Expand Down Expand Up @@ -168,7 +171,9 @@ private static Schema projectedSchema(Configuration conf, String tableName, Sche
// it is necessary to ensure projectedSchema equals to tableSchema,
// or we cannot find selectOperator's column from inspector
if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
return tableSchema;
return RowLineageUtils.isRowLineageInsert(conf) ?
MetadataColumns.schemaWithRowLineage(tableSchema) :
tableSchema;
} else {
return projectedSchema;
}
Expand All @@ -183,6 +188,10 @@ private static Schema projectedSchema(Configuration conf, String tableName, Sche
case UPDATE:
return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
case OTHER:
boolean rowLineage = Boolean.parseBoolean(conf.get(SessionStateUtil.ROW_LINEAGE));
if (rowLineage) {
return MetadataColumns.schemaWithRowLineage(tableSchema);
}
return tableSchema;
default:
throw new IllegalArgumentException("Unsupported operation " + operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE)
.ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString()));

}

Expand Down Expand Up @@ -1809,6 +1811,11 @@ public boolean areSnapshotsSupported() {
return true;
}

@Override
public boolean supportsRowLineage(Map<String, String> tableProperties) {
return IcebergTableUtil.supportsRowLineage(tableProperties);
}

@Override
public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
import org.apache.hadoop.hive.ql.io.RowLineageInfo;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.MetadataColumns;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.mapreduce.RowLineageReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -71,6 +73,9 @@ private IcebergAcidUtil() {
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_HASH.getName(), PARTITION_STRUCT_META_COL);
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.FILE_PATH.getName(), MetadataColumns.FILE_PATH);
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_POSITION.getName(), MetadataColumns.ROW_POSITION);
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_LINEAGE_ID.getName(), MetadataColumns.ROW_ID);
VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(),
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
}

private static final Types.NestedField PARTITION_HASH_META_COL = Types.NestedField.required(
Expand Down Expand Up @@ -248,6 +253,8 @@ public T next() {
IcebergAcidUtil.parseFilePath(rec),
IcebergAcidUtil.parseFilePosition(rec),
StringUtils.EMPTY);
RowLineageInfo.setRowLineageInfoIntoConf(RowLineageReader.readRowId(rec),
RowLineageReader.readLastUpdatedSequenceNumber(rec), conf);
return (T) current;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,4 +814,9 @@ public static String defaultWarehouseLocation(TableIdentifier tableIdentifier,
return sb.toString();
}

public static boolean supportsRowLineage(Map<String, String> tableProperties) {
return Optional.ofNullable(tableProperties).filter(
properties -> IcebergTableUtil.formatVersion(tableProperties) >= 3 &&
FileFormat.PARQUET == IcebergTableUtil.defaultFileFormat(properties::getOrDefault)).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -61,6 +62,7 @@ public final class HiveBatchIterator implements CloseableIterator<HiveBatchConte
this.partitionColIndices = partitionColIndices;
this.partitionValues = partitionValues;
this.idToConstant = idToConstant;
RowLineageUtils.initializeRowLineageColumns(vrbCtx, batch);
}

@Override
Expand Down Expand Up @@ -129,6 +131,51 @@ private void advance() {
bcv.isNull[0] = true;
bcv.isRepeating = true;
break;
case ROW_LINEAGE_ID:
LongColumnVector rowIdLcv = (LongColumnVector) batch.cols[idx];
Object firstRowIdObj = idToConstant.get(MetadataColumns.ROW_ID.fieldId());
long firstRowId = (firstRowIdObj != null) ? (Long) firstRowIdObj : 0L;

// If vector[0] is still -1, the reader didn't find the column in the file.
if (rowIdLcv.vector[0] == -1L) {
for (int i = 0; i < batch.size; i++) {
rowIdLcv.vector[i] = firstRowId + rowOffset + i;
}
} else {
// Lineage data was found (could be 0). Preserve it and fill only the NULL gaps.
for (int i = 0; i < batch.size; i++) {
if (rowIdLcv.isNull[i]) {
rowIdLcv.vector[i] = firstRowId + rowOffset + i;
rowIdLcv.isNull[i] = false;
}
}
}
rowIdLcv.noNulls = true;
rowIdLcv.isRepeating = false;
break;

case LAST_UPDATED_SEQUENCE_NUMBER:
LongColumnVector lusnLcv = (LongColumnVector) batch.cols[idx];
Object fileSeqObj = idToConstant.get(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
long fileSeq = (fileSeqObj != null) ? (Long) fileSeqObj : 0L;

// If vector[0] is still -1, apply the file-level sequence number to the whole batch.
if (lusnLcv.vector[0] == -1L) {
for (int i = 0; i < batch.size; i++) {
lusnLcv.vector[i] = fileSeq;
}
} else {
// Lineage data found in file, fill only the gaps where data is missing.
for (int i = 0; i < batch.size; i++) {
if (!lusnLcv.noNulls && lusnLcv.isNull[i]) {
lusnLcv.vector[i] = fileSeq;
lusnLcv.isNull[i] = false;
}
}
}
lusnLcv.noNulls = true;
lusnLcv.isRepeating = false;
break;
}
}
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class WriterBuilder {
// To specify whether to write the actual row data while writing the delete files.
public static final String ICEBERG_DELETE_SKIPROWDATA = "iceberg.delete.skiprowdata";
public static final boolean ICEBERG_DELETE_SKIPROWDATA_DEFAULT = true;
private boolean shouldAddRowLineageColumns = false;

private WriterBuilder(Table table, UnaryOperator<String> ops) {
this.table = table;
Expand Down Expand Up @@ -126,7 +128,7 @@ public HiveIcebergWriter build() {

HiveFileWriterFactory writerFactory = HiveFileWriterFactory.builderFor(table)
.dataFileFormat(context.dataFileFormat())
.dataSchema(table.schema())
.dataSchema(shouldAddRowLineageColumns ? MetadataColumns.schemaWithRowLineage(table.schema()) : table.schema())
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteRowSchema(context.skipRowData() || !context.inputOrdered() ?
// SortingPositionOnlyDeleteWriter doesn't support rawData in delete schema
Expand Down Expand Up @@ -209,6 +211,11 @@ private static boolean shouldRewrite(DeleteFile deleteFile, boolean forDVs) {
return ContentFileUtil.isFileScoped(deleteFile);
}

public WriterBuilder addRowLineageColumns(boolean isRowLineage) {
this.shouldAddRowLineageColumns = isRowLineage;
return this;
}

static class Context {

private final FileFormat dataFileFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.HiveTableUtil;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.hive.IcebergTableUtil;

public abstract class AbstractIcebergRecordReader<T> extends RecordReader<Void, T> {

Expand Down Expand Up @@ -79,7 +81,10 @@ private static Schema readSchema(Configuration conf, Table table, boolean caseSe
}

if (InputFormatConfig.fetchVirtualColumns(conf)) {
return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table);
readSchema = IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table);
if (IcebergTableUtil.supportsRowLineage(table.properties())) {
readSchema = MetadataColumns.schemaWithRowLineage(readSchema);
}
}

return readSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.mapreduce;

import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;

public final class RowLineageReader {

private RowLineageReader() {
}

public static Long readRowId(Record rec) {
return (Long) rec.getField(MetadataColumns.ROW_ID.name());
}

public static Long readLastUpdatedSequenceNumber(Record rec) {
return (Long) rec.getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
}

public static boolean schemaHasRowLineageColumns(Schema schema) {
return schema.findField(MetadataColumns.ROW_ID.name()) != null ||
schema.findField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()) != null;
}
}
42 changes: 42 additions & 0 deletions iceberg/iceberg-handler/src/test/queries/positive/row_lineage.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
create table ice_t (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3');
insert into ice_t values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91);
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;

update ice_t set balance = 500 where id = 2;

select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;

-- merge
CREATE TABLE ice_merge (
id INT,
data STRING
)
STORED BY iceberg
TBLPROPERTIES ('format-version'='3');

INSERT INTO ice_merge VALUES
(1, 'a'),
(2, 'b'),
(3, 'c');

CREATE TABLE src (
id INT,
data STRING
)
STORED AS TEXTFILE;

INSERT INTO src VALUES
(2, 'bb'),
(4, 'd');

MERGE INTO ice_merge t
USING src s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET data = s.data
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.data);

SELECT id, data, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM ice_merge
ORDER BY ROW__LINEAGE__ID;
Loading