Skip to content

Commit 1bc1dd7

Browse files
committed
Core: Move decision about remote scan planning from Spark to Core
1 parent 696acfa commit 1bc1dd7

File tree

5 files changed

+21
-9
lines changed

5 files changed

+21
-9
lines changed

core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ protected PlanningMode deletePlanningMode() {
144144

145145
@Override
146146
protected CloseableIterable<ScanTask> doPlanFiles() {
147+
if (table() instanceof SupportsDistributedScanPlanning
148+
&& !((SupportsDistributedScanPlanning) table()).allowDistributedPlanning()) {
149+
return table().newBatchScan().planFiles();
150+
}
151+
147152
Snapshot snapshot = snapshot();
148153

149154
List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot);

core/src/main/java/org/apache/iceberg/BaseTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
* BaseTable using a {@link StaticTableOperations}. This way no Catalog related calls are needed
3939
* when reading the table data after deserialization.
4040
*/
41-
public class BaseTable implements Table, HasTableOperations, Serializable {
41+
public class BaseTable
42+
implements Table, HasTableOperations, Serializable, SupportsDistributedScanPlanning {
4243
private final TableOperations ops;
4344
private final String name;
4445
private final MetricsReporter reporter;

core/src/main/java/org/apache/iceberg/RequiresRemoteScanPlanning.java renamed to core/src/main/java/org/apache/iceberg/SupportsDistributedScanPlanning.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,9 @@
1818
*/
1919
package org.apache.iceberg;
2020

21-
/** Marker interface to indicate whether a Table requires remote scan planning */
22-
public interface RequiresRemoteScanPlanning {}
21+
/** Marker interface to indicate whether a Table supports distributed scan planning */
22+
public interface SupportsDistributedScanPlanning {
23+
default boolean allowDistributedPlanning() {
24+
return true;
25+
}
26+
}

core/src/main/java/org/apache/iceberg/rest/RESTTable.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
import org.apache.iceberg.BatchScan;
2626
import org.apache.iceberg.BatchScanAdapter;
2727
import org.apache.iceberg.ImmutableTableScanContext;
28-
import org.apache.iceberg.RequiresRemoteScanPlanning;
28+
import org.apache.iceberg.SupportsDistributedScanPlanning;
2929
import org.apache.iceberg.TableOperations;
3030
import org.apache.iceberg.TableScan;
3131
import org.apache.iceberg.catalog.TableIdentifier;
3232
import org.apache.iceberg.metrics.MetricsReporter;
3333

34-
class RESTTable extends BaseTable implements RequiresRemoteScanPlanning {
34+
class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
3535
private final RESTClient client;
3636
private final Supplier<Map<String, String>> headers;
3737
private final MetricsReporter reporter;
@@ -84,4 +84,9 @@ public TableScan newScan() {
8484
public BatchScan newBatchScan() {
8585
return new BatchScanAdapter(newScan());
8686
}
87+
88+
@Override
89+
public boolean allowDistributedPlanning() {
90+
return false;
91+
}
8792
}

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.iceberg.MetricsConfig;
3535
import org.apache.iceberg.MetricsModes;
3636
import org.apache.iceberg.PartitionSpec;
37-
import org.apache.iceberg.RequiresRemoteScanPlanning;
3837
import org.apache.iceberg.Schema;
3938
import org.apache.iceberg.Snapshot;
4039
import org.apache.iceberg.SparkDistributedDataScan;
@@ -761,9 +760,7 @@ public StructType readSchema() {
761760
}
762761

763762
private BatchScan newBatchScan() {
764-
if (table instanceof RequiresRemoteScanPlanning) {
765-
return table.newBatchScan();
766-
} else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
763+
if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
767764
return new SparkDistributedDataScan(spark, table, readConf);
768765
} else {
769766
return table.newBatchScan();

0 commit comments

Comments
 (0)