Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ val spark = SparkSession.builder()
| spark.sql.iceberg.executor-cache.locality.enabled | false | Enables locality-aware executor cache usage |
| spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |
| spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective |
| spark.sql.iceberg.read.as-of-timestamp | null | Default timestamp in milliseconds for time-travel queries; applies to all reads when no explicit snapshot-id, as-of-timestamp, branch, or tag is specified |

### Read options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
Expand Down Expand Up @@ -642,6 +643,9 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
.append();

Table table = Spark3Util.loadIcebergTable(spark, tableName);
Long previousSnapshotId = table.currentSnapshot().snapshotId();
// remember the time when the first snapshot was valid
long previousSnapshotTimestamp = System.currentTimeMillis();

table.updateSchema().addColumn("category", Types.StringType.get()).commit();

Expand All @@ -659,14 +663,7 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
table.refresh();
Long currentSnapshotId = table.currentSnapshot().snapshotId();

Dataset<Row> actualFilesDs =
spark.sql(
"SELECT * FROM "
+ tableName
+ ".files VERSION AS OF "
+ currentSnapshotId
+ " ORDER BY content");
List<Row> actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
// Test schema
Schema entriesTableSchema =
TypeUtil.selectNot(
Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(),
Expand All @@ -675,6 +672,11 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
List<Record> expectedFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null);

// Metadata for current snapshot
Dataset<Row> actualFilesDs =
spark.sql("SELECT * FROM " + tableName + ".files" + " ORDER BY content");
List<Row> actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList();

assertThat(actualFiles).as("actualFiles size should be 2").hasSize(2);

TestHelpers.assertEqualsSafe(
Expand All @@ -686,6 +688,33 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
assertThat(actualFiles)
.as("expectedFiles and actualFiles size should be the same")
.hasSameSizeAs(expectedFiles);

// Metadata for previous snapshot
Dataset<Row> actualFilesDs2 =
spark.sql(
"SELECT * FROM "
+ tableName
+ ".files VERSION AS OF "
+ previousSnapshotId
+ " ORDER BY content");
List<Row> actualFiles2 = TestHelpers.selectNonDerived(actualFilesDs2).collectAsList();

assertThat(actualFiles2).as("actualFiles size should be 1").hasSize(1);

TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs2), expectedFiles.get(0), actualFiles.get(0));

// Using session-level time-travel
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, previousSnapshotTimestamp);
Dataset<Row> actualFilesDs3 =
spark.sql("SELECT * FROM " + tableName + ".files" + " ORDER BY content");
List<Row> actualFiles3 = TestHelpers.selectNonDerived(actualFilesDs3).collectAsList();

assertThat(actualFiles3).as("actualFiles size should be 1").hasSize(1);

TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs3), expectedFiles.get(0), actualFiles.get(0));
spark.conf().unset(SparkSQLProperties.AS_OF_TIMESTAMP);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ public Long snapshotId() {
}

public Long asOfTimestamp() {
return confParser.longConf().option(SparkReadOptions.AS_OF_TIMESTAMP).parseOptional();
SparkConfParser.LongConfParser longParser =
confParser.longConf().option(SparkReadOptions.AS_OF_TIMESTAMP);
if (snapshotId() == null && branch == null && tag() == null) {
longParser.sessionConf(SparkSQLProperties.AS_OF_TIMESTAMP);
}
return longParser.parseOptional();
}

public Long startSnapshotId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ private SparkSQLProperties() {}
// Controls whether to report available column statistics to Spark for query optimization.
public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats";
public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;

// Session-level time-travel property; a timestamp in milliseconds.
// The snapshot used will be the snapshot current at this time.
public static final String AS_OF_TIMESTAMP = "spark.sql.iceberg.read.as-of-timestamp";
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
Expand All @@ -49,6 +50,7 @@
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -81,6 +83,7 @@ public static Object[] parameters() {
@TempDir private Path temp;

private static SparkSession spark = null;
private final long timestampBeforeAnySnapshots = 1L; // no snapshot at this time

@Parameter(index = 0)
private Map<String, String> properties;
Expand All @@ -101,6 +104,11 @@ public static void stopSpark() {
currentSpark.stop();
}

@AfterEach
public void cleanupSessionProperties() {
spark.conf().unset(SparkSQLProperties.AS_OF_TIMESTAMP);
}

@TestTemplate
public void testSnapshotSelectionById() {
String tableLocation = temp.resolve("iceberg-table").toFile().toString();
Expand Down Expand Up @@ -146,6 +154,23 @@ public void testSnapshotSelectionById() {
assertThat(previousSnapshotRecords)
.as("Previous snapshot rows should match")
.isEqualTo(firstBatchRecords);

// verify explicit snapshot-id option overrides session property
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, timestampBeforeAnySnapshots);
Dataset<Row> withExplicitSnapshotIdResult =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.SNAPSHOT_ID, parentSnapshotId)
.load(tableLocation);
List<SimpleRecord> withExplicitSnapshotIdRecords =
withExplicitSnapshotIdResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
assertThat(withExplicitSnapshotIdRecords)
.as("Explicit snapshot-id should override session property")
.isEqualTo(firstBatchRecords);
}

@TestTemplate
Expand Down Expand Up @@ -198,6 +223,71 @@ public void testSnapshotSelectionByTimestamp() {
assertThat(previousSnapshotRecords)
.as("Previous snapshot rows should match")
.isEqualTo(firstBatchRecords);

// verify explicit as-of-timestamp option overrides session property
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, timestampBeforeAnySnapshots);
Dataset<Row> withExplicitTimestampResult =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.AS_OF_TIMESTAMP, firstSnapshotTimestamp)
.load(tableLocation);
List<SimpleRecord> withExplicitTimestampRecords =
withExplicitTimestampResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
assertThat(withExplicitTimestampRecords)
.as("Explicit as-of-timestamp should override session property")
.isEqualTo(firstBatchRecords);
}

@TestTemplate
public void testSessionPropertyAsDefault() {
String tableLocation = temp.resolve("iceberg-table").toFile().toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, properties, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

// remember the time when the first snapshot was valid
long firstSnapshotTimestamp = System.currentTimeMillis();

// produce the second snapshot
List<SimpleRecord> secondBatchRecords =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

assertThat(table.snapshots()).as("Expected 2 snapshots").hasSize(2);

// verify without session property, current snapshot is read
Dataset<Row> currentSnapshotResult = spark.read().format("iceberg").load(tableLocation);
List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
expectedRecords.addAll(secondBatchRecords);
assertThat(currentSnapshotRecords)
.as("Current snapshot rows should match")
.isEqualTo(expectedRecords);

// verify with session property, only first snapshot data is read
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, firstSnapshotTimestamp);
Dataset<Row> sessionPropertyResult = spark.read().format("iceberg").load(tableLocation);
List<SimpleRecord> sessionPropertyRecords =
sessionPropertyResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(sessionPropertyRecords)
.as("First snapshot should be read when session property set.")
.isEqualTo(firstBatchRecords);
}

@TestTemplate
Expand Down Expand Up @@ -233,6 +323,13 @@ public void testSnapshotSelectionByInvalidTimestamp() {
.load(tableLocation))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find a snapshot older than");

// verify invalid session property timestamp raises exception
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, timestamp);
Dataset<Row> df = spark.read().format("iceberg").load(tableLocation);
assertThatThrownBy(df::collectAsList)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find a snapshot older than");
}

@TestTemplate
Expand Down Expand Up @@ -301,6 +398,16 @@ public void testSnapshotSelectionByTag() {
assertThat(currentSnapshotRecords)
.as("Current snapshot rows should match")
.isEqualTo(expectedRecords);

// verify explicit tag option overrides invalid session property
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, timestampBeforeAnySnapshots);
Dataset<Row> withExplicitTagResult =
spark.read().format("iceberg").option(SparkReadOptions.TAG, "tag").load(tableLocation);
List<SimpleRecord> withExplicitTagRecords =
withExplicitTagResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(withExplicitTagRecords)
.as("Explicit tag should override session property")
.isEqualTo(expectedRecords);
}

@TestTemplate
Expand Down Expand Up @@ -337,6 +444,23 @@ public void testSnapshotSelectionByBranch() {
assertThat(currentSnapshotRecords)
.as("Current snapshot rows should match")
.isEqualTo(expectedRecords);

// verify explicit branch option overrides invalid session property
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, timestampBeforeAnySnapshots);
Dataset<Row> withExplicitBranchResult =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.BRANCH, "branch")
.load(tableLocation);
List<SimpleRecord> withExplicitBranchRecords =
withExplicitBranchResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
assertThat(withExplicitBranchRecords)
.as("Explicit branch should override session property")
.isEqualTo(expectedRecords);
}

@TestTemplate
Expand Down
Loading