diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index 1956d8100b6d..c4d73bfc0f7a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -64,10 +65,12 @@ private static HiveIcebergWriter writer(JobConf jc) { // It gets the config from the FileSinkOperator which has its own config for every target table Table table = HiveTableUtil.deserializeTable(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME)); setWriterLevelConfiguration(jc, table); + boolean shouldAddRowLineageColumns = jc.getBoolean(SessionStateUtil.ROW_LINEAGE, false); return WriterBuilder.builderFor(table, jc::get) .queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname)) .attemptID(taskAttemptID) + .addRowLineageColumns(shouldAddRowLineageColumns) .build(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 59372e35731e..b82486a40695 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.metadata.RowLineageUtils; import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; +import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -168,7 +171,9 @@ private static Schema projectedSchema(Configuration conf, String tableName, Sche // it is necessary to ensure projectedSchema equals to tableSchema, // or we cannot find selectOperator's column from inspector if (projectedSchema.columns().size() != distinctSelectedColumns.length) { - return tableSchema; + return RowLineageUtils.isRowLineageInsert(conf) ? + MetadataColumns.schemaWithRowLineage(tableSchema) : + tableSchema; } else { return projectedSchema; } @@ -183,6 +188,10 @@ private static Schema projectedSchema(Configuration conf, String tableName, Sche case UPDATE: return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns()); case OTHER: + boolean rowLineage = Boolean.parseBoolean(conf.get(SessionStateUtil.ROW_LINEAGE)); + if (rowLineage) { + return MetadataColumns.schemaWithRowLineage(tableSchema); + } return tableSchema; default: throw new IllegalArgumentException("Unsupported operation " + operation); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 7554de2c588a..33a775e07949 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -313,6 +313,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet) cols))); + SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE) + .ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString())); } @@ -1809,6 +1811,11 @@ public boolean areSnapshotsSupported() { return true; } + @Override + public boolean supportsRowLineage(Map tableProperties) { + return IcebergTableUtil.supportsRowLineage(tableProperties); + } + @Override public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { TableDesc tableDesc = Utilities.getTableDesc(hmsTable); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java index c3ea21478d19..b2feed784e39 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.PositionDeleteInfo; +import org.apache.hadoop.hive.ql.io.RowLineageInfo; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.iceberg.ContentFile; import org.apache.iceberg.MetadataColumns; @@ -39,6 +40,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.mr.mapreduce.RowLineageReader; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -71,6 +73,9 @@ private IcebergAcidUtil() { VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_HASH.getName(), PARTITION_STRUCT_META_COL); VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.FILE_PATH.getName(), MetadataColumns.FILE_PATH); VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_POSITION.getName(), MetadataColumns.ROW_POSITION); + VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_LINEAGE_ID.getName(), MetadataColumns.ROW_ID); + VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(), + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); } private static final Types.NestedField PARTITION_HASH_META_COL = Types.NestedField.required( @@ -248,6 +253,8 @@ public T next() { IcebergAcidUtil.parseFilePath(rec), IcebergAcidUtil.parseFilePosition(rec), StringUtils.EMPTY); + RowLineageInfo.setRowLineageInfoIntoConf(RowLineageReader.readRowId(rec), + RowLineageReader.readLastUpdatedSequenceNumber(rec), conf); return (T) current; } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index b85df34405d0..ceb848c30952 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -814,4 +814,9 @@ public static String defaultWarehouseLocation(TableIdentifier tableIdentifier, return sb.toString(); } + public static boolean supportsRowLineage(Map tableProperties) { + return Optional.ofNullable(tableProperties).filter( + properties -> IcebergTableUtil.formatVersion(tableProperties) >= 3 && + FileFormat.PARQUET == IcebergTableUtil.defaultFileFormat(properties::getOrDefault)).isPresent(); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java index 32ef101f4aaa..d710162d8808 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader; +import org.apache.hadoop.hive.ql.metadata.RowLineageUtils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -61,6 +62,7 @@ public final class HiveBatchIterator implements CloseableIterator ops) { this.table = table; @@ -126,7 +128,7 @@ public HiveIcebergWriter build() { HiveFileWriterFactory writerFactory = HiveFileWriterFactory.builderFor(table) .dataFileFormat(context.dataFileFormat()) - .dataSchema(table.schema()) + .dataSchema(shouldAddRowLineageColumns ? MetadataColumns.schemaWithRowLineage(table.schema()) : table.schema()) .deleteFileFormat(context.deleteFileFormat()) .positionDeleteRowSchema(context.skipRowData() || !context.inputOrdered() ? // SortingPositionOnlyDeleteWriter doesn't support rawData in delete schema @@ -209,6 +211,11 @@ private static boolean shouldRewrite(DeleteFile deleteFile, boolean forDVs) { return ContentFileUtil.isFileScoped(deleteFile); } + public WriterBuilder addRowLineageColumns(boolean isRowLineage) { + this.shouldAddRowLineageColumns = isRowLineage; + return this; + } + static class Context { private final FileFormat dataFileFormat; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java index 3f57223b24e4..29f285cf37b1 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java @@ -23,6 +23,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -35,6 +36,7 @@ import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.HiveTableUtil; import org.apache.iceberg.mr.hive.IcebergAcidUtil; +import org.apache.iceberg.mr.hive.IcebergTableUtil; public abstract class AbstractIcebergRecordReader extends RecordReader { @@ -79,7 +81,10 @@ private static Schema readSchema(Configuration conf, Table table, boolean caseSe } if (InputFormatConfig.fetchVirtualColumns(conf)) { - return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table); + readSchema = IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table); + if (IcebergTableUtil.supportsRowLineage(table.properties())) { + readSchema = MetadataColumns.schemaWithRowLineage(readSchema); + } } return readSchema; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/RowLineageReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/RowLineageReader.java new file mode 100644 index 000000000000..ee10e7c553d3 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/RowLineageReader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; + +public final class RowLineageReader { + + private RowLineageReader() { + } + + public static Long readRowId(Record rec) { + return (Long) rec.getField(MetadataColumns.ROW_ID.name()); + } + + public static Long readLastUpdatedSequenceNumber(Record rec) { + return (Long) rec.getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()); + } + + public static boolean schemaHasRowLineageColumns(Schema schema) { + return schema.findField(MetadataColumns.ROW_ID.name()) != null || + schema.findField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()) != null; + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/row_lineage.q b/iceberg/iceberg-handler/src/test/queries/positive/row_lineage.q new file mode 100644 index 000000000000..0f91fb211016 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/row_lineage.q @@ -0,0 +1,42 @@ +create table ice_t (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3'); +insert into ice_t values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91); +select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id; + +update ice_t set balance = 500 where id = 2; + +select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id; + +-- merge +CREATE TABLE ice_merge ( + id INT, + data STRING +) +STORED BY iceberg +TBLPROPERTIES ('format-version'='3'); + +INSERT INTO ice_merge VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'); + +CREATE TABLE src ( + id INT, + data STRING +) +STORED AS TEXTFILE; + +INSERT INTO src VALUES + (2, 'bb'), + (4, 'd'); + +MERGE INTO ice_merge t +USING src s +ON t.id = s.id +WHEN MATCHED THEN + UPDATE SET data = s.data +WHEN NOT MATCHED THEN + INSERT VALUES (s.id, s.data); + +SELECT id, data, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM ice_merge +ORDER BY ROW__LINEAGE__ID; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/row_lineage.q.out b/iceberg/iceberg-handler/src/test/results/positive/row_lineage.q.out new file mode 100644 index 000000000000..0261b5381097 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/row_lineage.q.out @@ -0,0 +1,155 @@ +PREHOOK: query: create table ice_t (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_t +POSTHOOK: query: create table ice_t (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_t +PREHOOK: query: insert into ice_t values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_t +POSTHOOK: query: insert into ice_t values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_t +PREHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_t +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_t +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 aaa 25 0 1 +2 bbb 35 1 1 +3 ccc 82 2 1 +4 ddd 91 3 1 +PREHOOK: query: update ice_t set balance = 500 where id = 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_t +PREHOOK: Output: default@ice_t +PREHOOK: Output: default@ice_t +POSTHOOK: query: update ice_t set balance = 500 where id = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_t +POSTHOOK: Output: default@ice_t +POSTHOOK: Output: default@ice_t +PREHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_t +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_t +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 aaa 25 0 1 +2 bbb 500 1 2 +3 ccc 82 2 1 +4 ddd 91 3 1 +PREHOOK: query: CREATE TABLE ice_merge ( + id INT, + data STRING +) +STORED BY iceberg +TBLPROPERTIES ('format-version'='3') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_merge +POSTHOOK: query: CREATE TABLE ice_merge ( + id INT, + data STRING +) +STORED BY iceberg +TBLPROPERTIES ('format-version'='3') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_merge +PREHOOK: query: INSERT INTO ice_merge VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_merge +POSTHOOK: query: INSERT INTO ice_merge VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_merge +PREHOOK: query: CREATE TABLE src ( + id INT, + data STRING +) +STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src +POSTHOOK: query: CREATE TABLE src ( + id INT, + data STRING +) +STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@src +PREHOOK: query: INSERT INTO src VALUES + (2, 'bb'), + (4, 'd') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@src +POSTHOOK: query: INSERT INTO src VALUES + (2, 'bb'), + (4, 'd') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@src +POSTHOOK: Lineage: src.data SCRIPT [] +POSTHOOK: Lineage: src.id SCRIPT [] +PREHOOK: query: MERGE INTO ice_merge t +USING src s +ON t.id = s.id +WHEN MATCHED THEN + UPDATE SET data = s.data +WHEN NOT MATCHED THEN + INSERT VALUES (s.id, s.data) +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_merge +PREHOOK: Input: default@src +PREHOOK: Output: default@ice_merge +PREHOOK: Output: default@ice_merge +PREHOOK: Output: default@merge_tmp_table +POSTHOOK: query: MERGE INTO ice_merge t +USING src s +ON t.id = s.id +WHEN MATCHED THEN + UPDATE SET data = s.data +WHEN NOT MATCHED THEN + INSERT VALUES (s.id, s.data) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_merge +POSTHOOK: Input: default@src +POSTHOOK: Output: default@ice_merge +POSTHOOK: Output: default@ice_merge +POSTHOOK: Output: default@merge_tmp_table +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(ice_merge)ice_merge.null, ] +PREHOOK: query: SELECT id, data, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM ice_merge +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_merge +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT id, data, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM ice_merge +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_merge +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 a 0 1 +2 bb 1 2 +3 c 2 1 +4 d 4 2 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 1dc01d02f5d0..f2ec4e415e1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -697,6 +697,18 @@ public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, case ROWISDELETED: vcValues[i] = new BooleanWritable(ctx.getIoCxt().isDeletedRecord()); break; + case ROW_LINEAGE_ID: + vcValues[i] = null; + if (ctx.getIoCxt().getRowLineageInfo() != null) { + vcValues[i] = new LongWritable(ctx.getIoCxt().getRowLineageInfo().getBaseRowId()); + } + break; + case LAST_UPDATED_SEQUENCE_NUMBER: + vcValues[i] = null; + if (ctx.getIoCxt().getRowLineageInfo() != null) { + vcValues[i] = new LongWritable(ctx.getIoCxt().getRowLineageInfo().getLastUpdatedSequenceNumber()); + } + break; } } return vcValues; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index ec19dcab7375..e65cb6427edb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -134,6 +134,7 @@ else if(recordReader instanceof AcidInputFormat.AcidRecordReader) { } else { ioCxtRef.parseRecordIdentifier(jobConf); ioCxtRef.parsePositionDeleteInfo(jobConf); + ioCxtRef.parseRowLineageInfo(jobConf); } } return retVal; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index e2a03a199f2b..ebf543993063 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; /** * IOContext basically contains the position information of the current @@ -52,6 +53,7 @@ public class IOContext { private RecordIdentifier ri; private boolean isDeletedRecord; private PositionDeleteInfo pdi; + private RowLineageInfo rowLineageInfo; public static enum Comparison { GREATER, @@ -196,6 +198,14 @@ public PositionDeleteInfo getPositionDeleteInfo() { return pdi; } + public void parseRowLineageInfo(JobConf cconfiguration) { + this.rowLineageInfo = RowLineageInfo.parseFromConf(cconfiguration); + } + + public RowLineageInfo getRowLineageInfo() { + return rowLineageInfo; + } + public boolean isDeletedRecord() { return isDeletedRecord; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java index 784289e00455..a1be5affcb27 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java @@ -38,8 +38,8 @@ public static PositionDeleteInfo parseFromConf(Configuration conf) { return new PositionDeleteInfo(specId, partHash, filePath, rowPos, partitionProjection); } - public static void setIntoConf(Configuration conf, int specId, long partHash, String filePath, - long filePos, String partitionProjection) { + public static void setIntoConf(Configuration conf, int specId, long partHash, String filePath, long filePos, + String partitionProjection) { conf.setInt(CONF_KEY_SPEC_ID, specId); conf.setLong(CONF_KEY_PART_HASH, partHash); conf.set(CONF_KEY_FILE_PATH, filePath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RowLineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RowLineageInfo.java new file mode 100644 index 000000000000..e4936d8291ce --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RowLineageInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.conf.Configuration; + +public class RowLineageInfo { + + private final Long baseRowId; + private final Long lastUpdatedSequenceNumber; + + private static final String CONF_KEY_ROW_ID = "hive.io.context.row.lineage.row.id"; + private static final String CONF_KEY_LAST_UPDATED_SEQUENCE_NUMBER = + "hive.io.context.row.lineage.last.updated.sequence.number"; + + public RowLineageInfo(Long baseRowId, Long lastUpdatedSequenceNumber) { + this.baseRowId = baseRowId; + this.lastUpdatedSequenceNumber = lastUpdatedSequenceNumber; + } + + public Long getBaseRowId() { + return baseRowId; + } + + public Long getLastUpdatedSequenceNumber() { + return lastUpdatedSequenceNumber; + } + + public static RowLineageInfo parseFromConf(Configuration conf) { + Long rowId = conf.get(CONF_KEY_ROW_ID) != null ? Long.parseLong(conf.get(CONF_KEY_ROW_ID)) : null; + + Long lusn = conf.get(CONF_KEY_LAST_UPDATED_SEQUENCE_NUMBER) != null ? + Long.parseLong(conf.get(CONF_KEY_LAST_UPDATED_SEQUENCE_NUMBER)) : + null; + + return new RowLineageInfo(rowId, lusn); + } + + public static void setRowLineageInfoIntoConf(Long rowId, Long lusn, Configuration conf) { + if (rowId != null) { + conf.setLong(CONF_KEY_ROW_ID, rowId); + } + if (lusn != null) { + conf.setLong(CONF_KEY_LAST_UPDATED_SEQUENCE_NUMBER, lusn); + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index f138c1e25428..10a54769de20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -165,6 +165,7 @@ private void pushProjectionsAndFilters(final JobConf jobConf, private void pushFilters(final JobConf jobConf, RowSchema rowSchema, ExprNodeGenericFuncDesc filterExpr) { // construct column name list for reference by filter push down Utilities.setColumnNameList(jobConf, rowSchema, true); + Utilities.setColumnTypeList(jobConf, rowSchema, true); // push down filters if (filterExpr == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index a35a4536a4a9..5c108e8e588d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -88,6 +88,7 @@ import java.util.Set; import java.util.TreeMap; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.getRequestedSchemaWithRowLineageColumns; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; /** @@ -274,6 +275,7 @@ public void initialize( colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration); requestedSchema = DataWritableReadSupport .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); + requestedSchema = getRequestedSchemaWithRowLineageColumns(rbCtx, requestedSchema, fileSchema, colsToInclude); Path path = wrapPathForCache(filePath, cacheKey, configuration, blocks, cacheTag); this.reader = new ParquetFileReader( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 39358ca3d594..520c52a24a8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -790,6 +790,14 @@ default boolean areSnapshotsSupported() { return false; } + /** + * Gets whether this storage handler supports row lineage. + * @return true means row lineage is supported false otherwise + */ + default boolean supportsRowLineage(Map table) { + return false; + } + /** * Query the most recent unique snapshot's context of the passed table. * @param table - {@link org.apache.hadoop.hive.ql.metadata.Table} which snapshot context should be returned. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java new file mode 100644 index 000000000000..d3de8bbb9127 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.parse.rewrite.MergeStatement; +import org.apache.hadoop.hive.ql.parse.rewrite.sql.MultiInsertSqlGenerator; +import org.apache.hadoop.hive.ql.session.SessionStateUtil; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +public class RowLineageUtils { + + public static final Map ROW_LINEAGE_COLUMNS_TO_FILE_NAME = new EnumMap<>(VirtualColumn.class); + + static { + ROW_LINEAGE_COLUMNS_TO_FILE_NAME.put(VirtualColumn.ROW_LINEAGE_ID, "_row_id"); + ROW_LINEAGE_COLUMNS_TO_FILE_NAME.put(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER, "_last_updated_sequence_number"); + } + + public static boolean isRowLineageColumnPresent(VectorizedRowBatchCtx rbCtx, MessageType fileSchema, + VirtualColumn rowLineageColumn) { + return rbCtx.isVirtualColumnNeeded(rowLineageColumn.getName()) && fileSchema.containsField( + ROW_LINEAGE_COLUMNS_TO_FILE_NAME.get(rowLineageColumn)); + } + + public static void addRowLineageColumnsForUpdate(Table table, MultiInsertSqlGenerator sqlGenerator, + List insertValues, Configuration conf) { + if (supportsRowLineage(table)) { + SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, true); + // copy ROW_ID + sqlGenerator.append(","); + sqlGenerator.append(HiveUtils.unparseIdentifier(VirtualColumn.ROW_LINEAGE_ID.getName(), conf)); + + // reset LAST_UPDATED_SEQUENCE_NUMBER + sqlGenerator.append(", NULL AS "); + sqlGenerator.append(HiveUtils.unparseIdentifier(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(), conf)); + + insertValues.add(sqlGenerator.qualify(VirtualColumn.ROW_LINEAGE_ID.getName())); + insertValues.add(sqlGenerator.qualify(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName())); + } + } + + public static boolean supportsRowLineage(Table table) { + if (table.getStorageHandler() == null) { + return false; + } + return table.getStorageHandler().supportsRowLineage(table.getParameters()); + } + + public static boolean shouldAddRowLineageColumnsForMerge(MergeStatement mergeStatement, Configuration conf) { + boolean shouldAddRowLineageColumns = + supportsRowLineage(mergeStatement.getTargetTable()) && mergeStatement.hasWhenMatchedUpdateClause(); + SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, shouldAddRowLineageColumns); + return shouldAddRowLineageColumns; + } + + public static void addSourceColumnsForRowLineage(boolean isRowLineageSupported, MultiInsertSqlGenerator sqlGenerator, + Configuration conf) { + if (isRowLineageSupported) { + SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, true); + sqlGenerator.append(", "); + sqlGenerator.append(HiveUtils.unparseIdentifier(VirtualColumn.ROW_LINEAGE_ID.getName(), conf)); + sqlGenerator.append(", "); + sqlGenerator.append(HiveUtils.unparseIdentifier(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(), conf)); + } + } + + public static void addRowLineageColumnsForWhenMatchedUpdateClause(boolean isRowLineageSupported, List values, + String targetAlias, Configuration conf) { + if (isRowLineageSupported) { + values.add(targetAlias + "." + HiveUtils.unparseIdentifier(VirtualColumn.ROW_LINEAGE_ID.getName(), conf)); + values.add("NULL AS " + HiveUtils.unparseIdentifier(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName(), conf)); + } + } + + public static List addRowLineageValuesForAppendWhenNotMatchedClause(boolean isRowLineageSupported, + List insertClauseValues) { + if (isRowLineageSupported) { + List selectValues = new ArrayList<>(insertClauseValues); + // Pad for Iceberg writer schema + selectValues.add("NULL"); // ROW_ID + selectValues.add("NULL"); // LAST_UPDATED_SEQUENCE_NUMBER + return selectValues; + } + return insertClauseValues; + } + + public static void initializeRowLineageColumns(VectorizedRowBatchCtx vrbCtx, VectorizedRowBatch batch) { + int rowIdIdx = vrbCtx.findVirtualColumnNum(VirtualColumn.ROW_LINEAGE_ID); + if (rowIdIdx != -1) { + LongColumnVector rowIdLcv = (LongColumnVector) batch.cols[rowIdIdx]; + rowIdLcv.vector[0] = -1L; + } + int lusnIdx = vrbCtx.findVirtualColumnNum(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER); + if (lusnIdx != -1) { + LongColumnVector lusnLcv = (LongColumnVector) batch.cols[lusnIdx]; + lusnLcv.vector[0] = -1L; + } + } + + public static boolean isRowLineageInsert(Configuration conf) { + return SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE).map(Boolean.class::cast).orElse(false); + } + + public static MessageType getRequestedSchemaWithRowLineageColumns(VectorizedRowBatchCtx rbCtx, + MessageType requestedSchema, MessageType fileSchema, List colsToInclude) { + List newFields = new ArrayList<>(requestedSchema.getFields()); + if (isRowLineageColumnPresent(rbCtx, fileSchema, VirtualColumn.ROW_LINEAGE_ID)) { + colsToInclude.add(rbCtx.findVirtualColumnNum(VirtualColumn.ROW_LINEAGE_ID)); + newFields.add(fileSchema.getType(ROW_LINEAGE_COLUMNS_TO_FILE_NAME.get(VirtualColumn.ROW_LINEAGE_ID))); + } + if (isRowLineageColumnPresent(rbCtx, fileSchema, VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER)) { + colsToInclude.add(rbCtx.findVirtualColumnNum(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER)); + newFields.add( + fileSchema.getType(ROW_LINEAGE_COLUMNS_TO_FILE_NAME.get(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER))); + } + if (newFields.size() != requestedSchema.getFields().size()) { + return new MessageType(requestedSchema.getName(), newFields); + } + return requestedSchema; + } +} + + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 5be6735c1e3f..13840f723ab2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -1391,7 +1391,11 @@ public List getVirtualColumns() { isBlank(getMetaTable())) { virtualColumns.add(VirtualColumn.SNAPSHOT_ID); } - + if (isNonNative() && getStorageHandler().supportsRowLineage(getTTable().getParameters()) && + isBlank(getMetaTable())) { + virtualColumns.add(VirtualColumn.ROW_LINEAGE_ID); + virtualColumns.add(VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER); + } return virtualColumns; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index 7c1b49c97fbb..90c6b0bf2766 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -57,6 +57,8 @@ public enum VirtualColumn { ROW_POSITION("ROW__POSITION", TypeInfoFactory.longTypeInfo), SNAPSHOT_ID("SNAPSHOT__ID", TypeInfoFactory.longTypeInfo), PARTITION_PROJECTION("PARTITION__PROJECTION", TypeInfoFactory.stringTypeInfo), + ROW_LINEAGE_ID("ROW__LINEAGE__ID", TypeInfoFactory.longTypeInfo), + LAST_UPDATED_SEQUENCE_NUMBER("LAST__UPDATED__SEQUENCE__NUMBER", TypeInfoFactory.longTypeInfo), /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. @@ -71,7 +73,7 @@ public enum VirtualColumn { ImmutableSet.of(FILENAME.getName(), BLOCKOFFSET.getName(), RAWDATASIZE.getName(), GROUPINGID.getName(), ROWID.getName(), ROWISDELETED.getName(), PARTITION_SPEC_ID.getName(), PARTITION_HASH.getName(), FILE_PATH.getName(), ROW_POSITION.getName(), - PARTITION_PROJECTION.getName()); + PARTITION_PROJECTION.getName(), ROW_LINEAGE_ID.getName(), LAST_UPDATED_SEQUENCE_NUMBER.getName()); public static final ImmutableMap VIRTUAL_COLUMN_NAME_MAP = new ImmutableMap.Builder().putAll(getColumnNameMap()).build(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index b8acb2661fa6..f4b4c2ff3bad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -307,7 +307,9 @@ public class Vectorizer implements PhysicalPlanResolver { VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION, - VirtualColumn.PARTITION_PROJECTION); + VirtualColumn.PARTITION_PROJECTION, + VirtualColumn.ROW_LINEAGE_ID, + VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER); private HiveConf hiveConf; private enum VECTORIZABLE_UDAF { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f08808b01415..2d7ae00e3dbd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS; import static org.apache.hadoop.hive.ql.ddl.view.create.AbstractCreateViewAnalyzer.validateTablesUsed; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.isRowLineageInsert; import static org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.NON_FK_FILTERED; import static org.apache.hadoop.hive.ql.session.SessionStateUtil.MISSING_COLUMNS; @@ -179,6 +180,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.RowLineageUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; @@ -5132,7 +5134,7 @@ RowResolver handleInsertStatementSpec(List col_list, String dest, //no insert schema was specified return outputRR; } - if(targetTableSchema.size() != col_list.size()) { + if(targetTableSchema.size() != col_list.size() && !RowLineageUtils.isRowLineageInsert(conf)) { if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED) && col_list.get(0).getTypeInfo() instanceof ListTypeInfo && targetTableSchema.size() == explodeColListForUDTF(col_list).size()){ @@ -8977,7 +8979,7 @@ private Operator genConversionSelectOperator(String dest, QB qb, Operator input, } // The numbers of input columns and output columns should match for regular query - if (!updating(dest) && !deleting(dest) && !merging(dest) && inColumnCnt != outColumnCnt) { + if (!updating(dest) && !deleting(dest) && !merging(dest) && !isRowLineageInsert(conf) && inColumnCnt != outColumnCnt) { String reason = "Table " + dest + " has " + outColumnCnt + " columns, but query has " + inColumnCnt + " columns."; throw new SemanticException(ASTErrorUtils.getMsg( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java index d43adacf0f7c..a51c22e30670 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java @@ -135,7 +135,7 @@ static class CopyOnWriteMergeWhenClauseSqlGenerator extends MergeRewriter.MergeW CopyOnWriteMergeWhenClauseSqlGenerator( HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement mergeStatement) { - super(conf, sqlGenerator, mergeStatement); + super(conf, sqlGenerator, mergeStatement, false); this.cowWithClauseBuilder = new COWWithClauseBuilder(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java index 44b0a0d9e5a6..f13edba351af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java @@ -43,15 +43,20 @@ import java.util.Map; import java.util.function.Function; import java.util.function.UnaryOperator; + import org.apache.hadoop.hive.serde.serdeConstants; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.addRowLineageValuesForAppendWhenNotMatchedClause; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.addSourceColumnsForRowLineage; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.shouldAddRowLineageColumnsForMerge; public class MergeRewriter implements Rewriter, MergeStatement.DestClausePrefixSetter { private final Hive db; protected final HiveConf conf; protected final SqlGeneratorFactory sqlGeneratorFactory; + protected boolean isRowLineageSupported; public MergeRewriter(Hive db, HiveConf conf, SqlGeneratorFactory sqlGeneratorFactory) { this.db = db; @@ -64,6 +69,7 @@ public ParseUtils.ReparseResult rewrite(Context ctx, MergeStatement mergeStateme setOperation(ctx); MultiInsertSqlGenerator sqlGenerator = sqlGeneratorFactory.createSqlGenerator(); + isRowLineageSupported = shouldAddRowLineageColumnsForMerge(mergeStatement, conf); handleSource(mergeStatement.hasWhenNotMatchedInsertClause(), mergeStatement.getSourceAlias(), mergeStatement.getOnClauseAsText(), sqlGenerator); @@ -98,7 +104,7 @@ public ParseUtils.ReparseResult rewrite(Context ctx, MergeStatement mergeStateme protected MergeWhenClauseSqlGenerator createMergeSqlGenerator( MergeStatement mergeStatement, MultiInsertSqlGenerator sqlGenerator) { - return new MergeWhenClauseSqlGenerator(conf, sqlGenerator, mergeStatement); + return new MergeWhenClauseSqlGenerator(conf, sqlGenerator, mergeStatement, isRowLineageSupported); } private void handleSource(boolean hasWhenNotMatchedClause, String sourceAlias, String onClauseAsText, @@ -107,6 +113,7 @@ private void handleSource(boolean hasWhenNotMatchedClause, String sourceAlias, S sqlGenerator.append("(SELECT "); sqlGenerator.appendAcidSelectColumns(Operation.MERGE); sqlGenerator.appendAllColsOfTargetTable(); + addSourceColumnsForRowLineage(isRowLineageSupported, sqlGenerator, conf); sqlGenerator.append(" FROM ").appendTargetTableName().append(") "); sqlGenerator.appendSubQueryAlias(); sqlGenerator.append('\n'); @@ -169,12 +176,15 @@ protected static class MergeWhenClauseSqlGenerator implements MergeStatement.Mer protected final MultiInsertSqlGenerator sqlGenerator; protected final MergeStatement mergeStatement; protected String hintStr; + private final boolean isRowLineageSupported; - MergeWhenClauseSqlGenerator(HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement mergeStatement) { + MergeWhenClauseSqlGenerator(HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement mergeStatement, + boolean isRowLineageSupported) { this.conf = conf; this.sqlGenerator = sqlGenerator; this.mergeStatement = mergeStatement; this.hintStr = mergeStatement.getHintStr(); + this.isRowLineageSupported = isRowLineageSupported; } @Override @@ -192,7 +202,10 @@ public void appendWhenNotMatchedInsertClause(MergeStatement.InsertClause insertC hintStr = null; } - sqlGenerator.append(String.join(", ", insertClause.getValuesClause())); + List selectValues = + addRowLineageValuesForAppendWhenNotMatchedClause(isRowLineageSupported, insertClause.getValuesClause()); + + sqlGenerator.append(String.join(", ", selectValues)); sqlGenerator.append("\n WHERE ").append(insertClause.getPredicate()); if (insertClause.getExtraPredicate() != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeStatement.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeStatement.java index 13826c26074b..c6ee2809773c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeStatement.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeStatement.java @@ -156,6 +156,15 @@ public boolean hasWhenNotMatchedInsertClause() { return false; } + public boolean hasWhenMatchedUpdateClause() { + for (WhenClause whenClause : whenClauses) { + if (whenClause instanceof UpdateClause) { + return true; + } + } + return false; + } + /** * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, * an error should be raised if > 1 row of "source" matches the same row in "target". diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitMergeRewriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitMergeRewriter.java index 5ba8bd0193e9..84fcf186f6b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitMergeRewriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitMergeRewriter.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.List; -import static java.util.Arrays.asList; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.addRowLineageColumnsForWhenMatchedUpdateClause; public class SplitMergeRewriter extends MergeRewriter { @@ -38,14 +38,17 @@ public SplitMergeRewriter(Hive db, HiveConf conf, SqlGeneratorFactory sqlGenerat @Override protected MergeWhenClauseSqlGenerator createMergeSqlGenerator( MergeStatement mergeStatement, MultiInsertSqlGenerator sqlGenerator) { - return new SplitMergeWhenClauseSqlGenerator(conf, sqlGenerator, mergeStatement); + return new SplitMergeWhenClauseSqlGenerator(conf, sqlGenerator, mergeStatement, isRowLineageSupported); } static class SplitMergeWhenClauseSqlGenerator extends MergeWhenClauseSqlGenerator { + private final boolean isRowLineageSupported; SplitMergeWhenClauseSqlGenerator( - HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement mergeStatement) { - super(conf, sqlGenerator, mergeStatement); + HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement mergeStatement, + boolean isRowLineageSupported) { + super(conf, sqlGenerator, mergeStatement, isRowLineageSupported); + this.isRowLineageSupported = isRowLineageSupported; } @Override @@ -57,6 +60,7 @@ public void appendWhenMatchedUpdateClause(MergeStatement.UpdateClause updateClau sqlGenerator.append(" -- update clause (insert part)\n"); List values = new ArrayList<>(targetTable.getCols().size() + targetTable.getPartCols().size()); addValues(targetTable, targetAlias, updateClause.getNewValuesMap(), values); + addRowLineageColumnsForWhenMatchedUpdateClause(isRowLineageSupported, values, targetAlias, conf); sqlGenerator.appendInsertBranch(hintStr, values); hintStr = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitUpdateRewriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitUpdateRewriter.java index 16724f42d70a..d14ddc7eb485 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitUpdateRewriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/SplitUpdateRewriter.java @@ -35,6 +35,8 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.ql.metadata.RowLineageUtils.addRowLineageColumnsForUpdate; + public class SplitUpdateRewriter implements Rewriter { private static final Context.Operation OPERATION = Context.Operation.UPDATE; @@ -100,6 +102,7 @@ public ParseUtils.ReparseResult rewrite(Context context, UpdateStatement updateB updateBlock.getTargetTable().getPartCols().forEach( fieldSchema -> insertValues.add(sqlGenerator.qualify(HiveUtils.unparseIdentifier(fieldSchema.getName(), conf)))); } + addRowLineageColumnsForUpdate(updateBlock.getTargetTable(), sqlGenerator, insertValues, conf); sqlGenerator.append(" FROM ").append(sqlGenerator.getTargetTableFullName()).append(") "); sqlGenerator.appendSubQueryAlias().append("\n"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java index d1367056855b..c70f7fd18c96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java @@ -40,6 +40,7 @@ public class SessionStateUtil { public static final String DEFAULT_TABLE_LOCATION = "defaultLocation"; public static final String MISSING_COLUMNS = "missingColumns"; public static final String COLUMN_DEFAULTS = "columnDefaults"; + public static final String ROW_LINEAGE = "rowLineage"; private SessionStateUtil() { }