Skip to content

Commit b7ceb6b

Browse files
committed
Spark: Add session property for time-travel default timestamp
Adds spark.sql.iceberg.read.as-of-timestamp session property to allow setting a default timestamp for time-travel queries across all table reads when no explicit snapshot-id, as-of-timestamp, branch, or tag is specified.
1 parent 83653ba commit b7ceb6b

File tree

21 files changed

+1642
-65
lines changed

21 files changed

+1642
-65
lines changed

docs/docs/spark-configuration.md

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,31 +171,32 @@ val spark = SparkSession.builder()
171171
.getOrCreate()
172172
```
173173

174-
| Spark option | Default | Description |
175-
|--------------------------------------------------------|----------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
176-
| spark.sql.iceberg.vectorization.enabled | Table default | Enables vectorized reads of data files |
177-
| spark.sql.iceberg.parquet.reader-type | ICEBERG | Sets Parquet reader implementation (`ICEBERG`,`COMET`) |
178-
| spark.sql.iceberg.check-nullability | true | Validate that the write schema's nullability matches the table's nullability |
179-
| spark.sql.iceberg.check-ordering | true | Validates the write schema column order matches the table schema order |
180-
| spark.sql.iceberg.planning.preserve-data-grouping | false | When true, co-locate scan tasks for the same partition in the same read split, used in Storage Partitioned Joins |
181-
| spark.sql.iceberg.aggregate-push-down.enabled | true | Enables pushdown of aggregate functions (MAX, MIN, COUNT) |
182-
| spark.sql.iceberg.distribution-mode | See [Spark Writes](spark-writes.md#writing-distribution-modes) | Controls distribution strategy during writes |
183-
| spark.wap.id | null | [Write-Audit-Publish](branching.md#audit-branch) snapshot staging ID |
184-
| spark.wap.branch | null | WAP branch name for snapshot commit |
185-
| spark.sql.iceberg.compression-codec | Table default | Write compression codec (e.g., `zstd`, `snappy`) |
186-
| spark.sql.iceberg.compression-level | Table default | Compression level for Parquet/Avro |
187-
| spark.sql.iceberg.compression-strategy | Table default | Compression strategy for ORC |
188-
| spark.sql.iceberg.data-planning-mode | AUTO | Scan planning mode for data files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
189-
| spark.sql.iceberg.delete-planning-mode | AUTO | Scan planning mode for delete files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
190-
| spark.sql.iceberg.advisory-partition-size | Table default | Advisory size (bytes) used for writing to the Table when Spark's Adaptive Query Execution is enabled. Used to size output files |
191-
| spark.sql.iceberg.locality.enabled | false | Report locality information for Spark task placement on executors |
192-
| spark.sql.iceberg.executor-cache.enabled | true | Enables cache for executor-side (currently used to cache Delete Files) |
193-
| spark.sql.iceberg.executor-cache.timeout | 10 | Timeout in minutes for executor cache entries |
194-
| spark.sql.iceberg.executor-cache.max-entry-size | 67108864 (64MB) | Max size per cache entry (bytes) |
195-
| spark.sql.iceberg.executor-cache.max-total-size | 134217728 (128MB) | Max total executor cache size (bytes) |
196-
| spark.sql.iceberg.executor-cache.locality.enabled | false | Enables locality-aware executor cache usage |
197-
| spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |
198-
| spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective |
174+
| Spark option | Default | Description |
175+
|---------------------------------------------------|----------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
176+
| spark.sql.iceberg.vectorization.enabled | Table default | Enables vectorized reads of data files |
177+
| spark.sql.iceberg.parquet.reader-type | ICEBERG | Sets Parquet reader implementation (`ICEBERG`,`COMET`) |
178+
| spark.sql.iceberg.check-nullability | true | Validate that the write schema's nullability matches the table's nullability |
179+
| spark.sql.iceberg.check-ordering | true | Validates the write schema column order matches the table schema order |
180+
| spark.sql.iceberg.planning.preserve-data-grouping | false | When true, co-locate scan tasks for the same partition in the same read split, used in Storage Partitioned Joins |
181+
| spark.sql.iceberg.aggregate-push-down.enabled | true | Enables pushdown of aggregate functions (MAX, MIN, COUNT) |
182+
| spark.sql.iceberg.distribution-mode | See [Spark Writes](spark-writes.md#writing-distribution-modes) | Controls distribution strategy during writes |
183+
| spark.wap.id | null | [Write-Audit-Publish](branching.md#audit-branch) snapshot staging ID |
184+
| spark.wap.branch | null | WAP branch name for snapshot commit |
185+
| spark.sql.iceberg.compression-codec | Table default | Write compression codec (e.g., `zstd`, `snappy`) |
186+
| spark.sql.iceberg.compression-level | Table default | Compression level for Parquet/Avro |
187+
| spark.sql.iceberg.compression-strategy | Table default | Compression strategy for ORC |
188+
| spark.sql.iceberg.data-planning-mode | AUTO | Scan planning mode for data files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
189+
| spark.sql.iceberg.delete-planning-mode | AUTO | Scan planning mode for delete files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
190+
| spark.sql.iceberg.advisory-partition-size | Table default | Advisory size (bytes) used for writing to the Table when Spark's Adaptive Query Execution is enabled. Used to size output files |
191+
| spark.sql.iceberg.locality.enabled | false | Report locality information for Spark task placement on executors |
192+
| spark.sql.iceberg.executor-cache.enabled | true | Enables cache for executor-side (currently used to cache Delete Files) |
193+
| spark.sql.iceberg.executor-cache.timeout | 10 | Timeout in minutes for executor cache entries |
194+
| spark.sql.iceberg.executor-cache.max-entry-size | 67108864 (64MB) | Max size per cache entry (bytes) |
195+
| spark.sql.iceberg.executor-cache.max-total-size | 134217728 (128MB) | Max total executor cache size (bytes) |
196+
| spark.sql.iceberg.executor-cache.locality.enabled | false | Enables locality-aware executor cache usage |
197+
| spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |
198+
| spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective |
199+
| spark.sql.iceberg.read.as-of-timestamp | null | Default timestamp in milliseconds for time-travel queries; applies to all reads when no explicit snapshot-id, as-of-timestamp, branch, or tag is specified |
199200

200201
### Read options
201202

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5757
import org.apache.iceberg.spark.Spark3Util;
5858
import org.apache.iceberg.spark.SparkCatalogConfig;
59+
import org.apache.iceberg.spark.SparkSQLProperties;
5960
import org.apache.iceberg.spark.SparkSchemaUtil;
6061
import org.apache.iceberg.spark.data.TestHelpers;
6162
import org.apache.iceberg.spark.source.SimpleRecord;
@@ -642,6 +643,9 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
642643
.append();
643644

644645
Table table = Spark3Util.loadIcebergTable(spark, tableName);
646+
Long previousSnapshotId = table.currentSnapshot().snapshotId();
647+
// remember the time when the first snapshot was valid
648+
long previousSnapshotTimestamp = System.currentTimeMillis();
645649

646650
table.updateSchema().addColumn("category", Types.StringType.get()).commit();
647651

@@ -659,14 +663,7 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
659663
table.refresh();
660664
Long currentSnapshotId = table.currentSnapshot().snapshotId();
661665

662-
Dataset<Row> actualFilesDs =
663-
spark.sql(
664-
"SELECT * FROM "
665-
+ tableName
666-
+ ".files VERSION AS OF "
667-
+ currentSnapshotId
668-
+ " ORDER BY content");
669-
List<Row> actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
666+
// Test schema
670667
Schema entriesTableSchema =
671668
TypeUtil.selectNot(
672669
Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(),
@@ -675,6 +672,11 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
675672
List<Record> expectedFiles =
676673
expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null);
677674

675+
// Metadata for current snapshot
676+
Dataset<Row> actualFilesDs =
677+
spark.sql("SELECT * FROM " + tableName + ".files" + " ORDER BY content");
678+
List<Row> actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
679+
678680
assertThat(actualFiles).as("actualFiles size should be 2").hasSize(2);
679681

680682
TestHelpers.assertEqualsSafe(
@@ -686,6 +688,33 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
686688
assertThat(actualFiles)
687689
.as("expectedFiles and actualFiles size should be the same")
688690
.hasSameSizeAs(expectedFiles);
691+
692+
// Metadata for previous snapshot
693+
Dataset<Row> actualFilesDs2 =
694+
spark.sql(
695+
"SELECT * FROM "
696+
+ tableName
697+
+ ".files VERSION AS OF "
698+
+ previousSnapshotId
699+
+ " ORDER BY content");
700+
List<Row> actualFiles2 = TestHelpers.selectNonDerived(actualFilesDs2).collectAsList();
701+
702+
assertThat(actualFiles2).as("actualFiles size should be 1").hasSize(1);
703+
704+
TestHelpers.assertEqualsSafe(
705+
TestHelpers.nonDerivedSchema(actualFilesDs2), expectedFiles.get(0), actualFiles.get(0));
706+
707+
// Using session-level time-travel
708+
spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, previousSnapshotTimestamp);
709+
Dataset<Row> actualFilesDs3 =
710+
spark.sql("SELECT * FROM " + tableName + ".files" + " ORDER BY content");
711+
List<Row> actualFiles3 = TestHelpers.selectNonDerived(actualFilesDs3).collectAsList();
712+
713+
assertThat(actualFiles3).as("actualFiles size should be 1").hasSize(1);
714+
715+
TestHelpers.assertEqualsSafe(
716+
TestHelpers.nonDerivedSchema(actualFilesDs3), expectedFiles.get(0), actualFiles.get(0));
717+
spark.conf().unset(SparkSQLProperties.AS_OF_TIMESTAMP);
689718
}
690719

691720
@TestTemplate

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,12 @@ public Long snapshotId() {
9494
}
9595

9696
public Long asOfTimestamp() {
97-
return confParser.longConf().option(SparkReadOptions.AS_OF_TIMESTAMP).parseOptional();
97+
SparkConfParser.LongConfParser longParser =
98+
confParser.longConf().option(SparkReadOptions.AS_OF_TIMESTAMP);
99+
if (snapshotId() == null && branch == null && tag() == null) {
100+
longParser.sessionConf(SparkSQLProperties.AS_OF_TIMESTAMP);
101+
}
102+
return longParser.parseOptional();
98103
}
99104

100105
public Long startSnapshotId() {

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,8 @@ private SparkSQLProperties() {}
110110
// Controls whether to report available column statistics to Spark for query optimization.
111111
public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats";
112112
public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
113+
114+
// Session-level time-travel property; a timestamp in milliseconds.
115+
// The snapshot used will be the snapshot current at this time.
116+
public static final String AS_OF_TIMESTAMP = "spark.sql.iceberg.read.as-of-timestamp";
113117
}

0 commit comments

Comments
 (0)