Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
* BaseTable using a {@link StaticTableOperations}. This way no Catalog related calls are needed
* when reading the table data after deserialization.
*/
public class BaseTable implements Table, HasTableOperations, Serializable {
public class BaseTable
implements Table, HasTableOperations, Serializable, SupportsDistributedScanPlanning {
private final TableOperations ops;
private final String name;
private final MetricsReporter reporter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@
*/
package org.apache.iceberg;

/** Marker interface to indicate whether a Table requires remote scan planning */
public interface RequiresRemoteScanPlanning {}
/** Interface to indicate whether a Table supports distributed scan planning */
public interface SupportsDistributedScanPlanning {
default boolean allowDistributedPlanning() {
return true;
}
}
9 changes: 7 additions & 2 deletions core/src/main/java/org/apache/iceberg/rest/RESTTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.BatchScanAdapter;
import org.apache.iceberg.ImmutableTableScanContext;
import org.apache.iceberg.RequiresRemoteScanPlanning;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReporter;

class RESTTable extends BaseTable implements RequiresRemoteScanPlanning {
class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
private final RESTClient client;
private final Supplier<Map<String, String>> headers;
private final MetricsReporter reporter;
Expand Down Expand Up @@ -84,4 +84,9 @@ public TableScan newScan() {
public BatchScan newBatchScan() {
return new BatchScanAdapter(newScan());
}

@Override
public boolean allowDistributedPlanning() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -295,8 +296,15 @@ public int parallelism() {
return Math.max(defaultParallelism, numShufflePartitions);
}

public boolean distributedPlanningDisallowed() {
return table instanceof SupportsDistributedScanPlanning distributed
&& !distributed.allowDistributedPlanning();
}

public boolean distributedPlanningEnabled() {
return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
return table instanceof SupportsDistributedScanPlanning distributed
&& distributed.allowDistributedPlanning()
&& (dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL);
}

public PlanningMode dataPlanningMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RequiresRemoteScanPlanning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SparkDistributedDataScan;
Expand Down Expand Up @@ -761,9 +760,9 @@ public StructType readSchema() {
}

private BatchScan newBatchScan() {
if (table instanceof RequiresRemoteScanPlanning) {
if (readConf.distributedPlanningDisallowed()) {
return table.newBatchScan();
} else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
} else if (readConf.distributedPlanningEnabled()) {
return new SparkDistributedDataScan(spark, table, readConf);
} else {
return table.newBatchScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -291,8 +292,15 @@ public int parallelism() {
return Math.max(defaultParallelism, numShufflePartitions);
}

public boolean distributedPlanningDisallowed() {
return table instanceof SupportsDistributedScanPlanning distributed
&& !distributed.allowDistributedPlanning();
}

public boolean distributedPlanningEnabled() {
return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
return table instanceof SupportsDistributedScanPlanning distributed
&& distributed.allowDistributedPlanning()
&& (dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL);
}

public PlanningMode dataPlanningMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RequiresRemoteScanPlanning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SparkDistributedDataScan;
Expand Down Expand Up @@ -761,9 +760,9 @@ public StructType readSchema() {
}

private BatchScan newBatchScan() {
if (table instanceof RequiresRemoteScanPlanning) {
if (readConf.distributedPlanningDisallowed()) {
return table.newBatchScan();
} else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
} else if (readConf.distributedPlanningEnabled()) {
return new SparkDistributedDataScan(spark, table, readConf);
} else {
return table.newBatchScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -291,8 +292,15 @@ public int parallelism() {
return Math.max(defaultParallelism, numShufflePartitions);
}

public boolean distributedPlanningDisallowed() {
return table instanceof SupportsDistributedScanPlanning distributed
&& !distributed.allowDistributedPlanning();
}
Comment on lines +295 to +298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: It might be contradictory to see we have distributedPlanningDisallowed not an exact negation of distributedPlanningEnabled ?

How about we name this : underlyingTableSupportsDistributedPlanning() ?


public boolean distributedPlanningEnabled() {
return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
return table instanceof SupportsDistributedScanPlanning distributed
&& distributed.allowDistributedPlanning()
&& (dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL);
Comment on lines +301 to +303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[not scope of this change] should we change the spark docs for this https://iceberg.apache.org/docs/nightly/spark-configuration/#spark-sql-options

}

public PlanningMode dataPlanningMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RequiresRemoteScanPlanning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SparkDistributedDataScan;
Expand Down Expand Up @@ -761,9 +760,9 @@ public StructType readSchema() {
}

private BatchScan newBatchScan() {
if (table instanceof RequiresRemoteScanPlanning) {
if (readConf.distributedPlanningDisallowed()) {
return table.newBatchScan();
} else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
} else if (readConf.distributedPlanningEnabled()) {
return new SparkDistributedDataScan(spark, table, readConf);
} else {
return table.newBatchScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -291,8 +292,15 @@ public int parallelism() {
return Math.max(defaultParallelism, numShufflePartitions);
}

public boolean distributedPlanningDisallowed() {
return table instanceof SupportsDistributedScanPlanning distributed
&& !distributed.allowDistributedPlanning();
}

public boolean distributedPlanningEnabled() {
return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
return table instanceof SupportsDistributedScanPlanning distributed
&& distributed.allowDistributedPlanning()
&& (dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL);
}

public PlanningMode dataPlanningMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RequiresRemoteScanPlanning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SparkDistributedDataScan;
Expand Down Expand Up @@ -761,9 +760,9 @@ public StructType readSchema() {
}

private BatchScan newBatchScan() {
if (table instanceof RequiresRemoteScanPlanning) {
if (readConf.distributedPlanningDisallowed()) {
return table.newBatchScan();
} else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
} else if (readConf.distributedPlanningEnabled()) {
return new SparkDistributedDataScan(spark, table, readConf);
} else {
return table.newBatchScan();
Expand Down