Skip to content

Spline doesn't register delta overwritePartitions() operation #900

@denstern

Description

@denstern

I'm running spark application with spark == 3.5.5 and delta == 3.3.2 versions.

Code contains methods of DataFrameWriterv2 spark class. Error occures when delta-table is saved using overwritePartitions() method.

Used spark-agent: spark-3.5-spline-agent-bundle_2.12-2.2.3.jar

Error message:

java.lang.RuntimeException: Write extraction failed for: class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand
	at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:153)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
	at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onSuccess(QueryExecutionListenerDelegate.scala:28)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:41)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1$adapted(SplineQueryExecutionListener.scala:41)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:173)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:143)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:143)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:155)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1356)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: scala.MatchError: DeltaDynamicPartitionOverwriteCommand RelationV2[date_id#1069, rand_num#1070] spark_catalog.test.test_dataframev2 spark_catalog.test.test_dataframev2, DeltaTableV2(org.apache.spark.sql.SparkSession@2f072633,s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2,Some(CatalogTable(
Catalog: spark_catalog
Database: test
Table: test_dataframev2
Owner: spark
Created Time: Mon Nov 24 12:16:20 UTC 2025
Last Access: UNKNOWN
Created By: Spark 3.5.5
Type: EXTERNAL
Provider: delta
delta.minReaderVersion=1, delta.minWriterVersion=7, numFilesErasureCoded=0]
Statistics: 6075 bytes
Location: s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Schema: root
	|-- date_id: date (nullable = true)
	|-- rand_num: integer (nullable = true)
	)),Some(test.test_dataframev2),None,Map()), true
	+- Project [date_id#2, rand_num#5]
		+- Project [id#0L, date_id#2, cast(((rand(3006909157548589595) * cast(200 as double)) - cast(100 as double)) as int) AS rand_num#5]
			+- Project [id#0L, date_sub(current_date(Some(Etc/UTC)), cast((rand(9151590119353770883) * cast(3 as double)) as int)) AS date_id#2]
				+- Range (0, 100, step=1, splits=Some(4))
	(of class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2WriteCommand(DataSourceV2Plugin.scala:92)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:73)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:56)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)
at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
at scala.util.Try$.apply(Try.scala:213)
at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
... 29 more

Code:

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    date_sub,
    rand,
    current_date
)

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger(__name__)
logger = logging.getLogger(__name__)

spark = (
      SparkSession.builder.appName("test_dataframev2_spline")
      .config("spark.sql.shuffle.partitions", "300")
      .config("spark.sql.autoBroadcastJoinThreshold", "-1")
      .config("spark.driver.maxResultSize", "4g")
      .config("spark.hadoop.fs.s3a.access.key", "access_key")
      .config("spark.hadoop.fs.s3a.secret.key", "secret_key")
      .enableHiveSupport()
      .getOrCreate()
  )
spark.sparkContext.setLogLevel("ERROR")

target_table_name = "test.test_dataframev2"

df = (
      spark.range(100)
      .withColumn("date_id", date_sub(current_date(), (rand() * 3).cast("integer")))
      .withColumn("rand_num", (rand() * 200 - 100).cast("integer"))
      .select("date_id", "rand_num")
)

try:
    df.writeTo(target_table_name).using("delta").partitionedBy("date_id").createOrReplace()
    logger.info(f"method createOrReplace succedeed")
except Exception as e:
    logger.info(f"error createOrReplace - {e}")
 
df.writeTo(target_table_name).using("delta").partitionedBy("date_id").overwritePartitions()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions