-
Notifications
You must be signed in to change notification settings - Fork 103
Open
Description
I'm running spark application with spark == 3.5.5 and delta == 3.3.2 versions.
Code contains methods of DataFrameWriterv2 spark class. Error occures when delta-table is saved using overwritePartitions() method.
Used spark-agent: spark-3.5-spline-agent-bundle_2.12-2.2.3.jar
Error message:
java.lang.RuntimeException: Write extraction failed for: class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand
at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:153)
at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onSuccess(QueryExecutionListenerDelegate.scala:28)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:41)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1$adapted(SplineQueryExecutionListener.scala:41)
at scala.Option.foreach(Option.scala:407)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:173)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:143)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:143)
at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:155)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1356)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: scala.MatchError: DeltaDynamicPartitionOverwriteCommand RelationV2[date_id#1069, rand_num#1070] spark_catalog.test.test_dataframev2 spark_catalog.test.test_dataframev2, DeltaTableV2(org.apache.spark.sql.SparkSession@2f072633,s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2,Some(CatalogTable(
Catalog: spark_catalog
Database: test
Table: test_dataframev2
Owner: spark
Created Time: Mon Nov 24 12:16:20 UTC 2025
Last Access: UNKNOWN
Created By: Spark 3.5.5
Type: EXTERNAL
Provider: delta
delta.minReaderVersion=1, delta.minWriterVersion=7, numFilesErasureCoded=0]
Statistics: 6075 bytes
Location: s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Schema: root
|-- date_id: date (nullable = true)
|-- rand_num: integer (nullable = true)
)),Some(test.test_dataframev2),None,Map()), true
+- Project [date_id#2, rand_num#5]
+- Project [id#0L, date_id#2, cast(((rand(3006909157548589595) * cast(200 as double)) - cast(100 as double)) as int) AS rand_num#5]
+- Project [id#0L, date_sub(current_date(Some(Etc/UTC)), cast((rand(9151590119353770883) * cast(3 as double)) as int)) AS date_id#2]
+- Range (0, 100, step=1, splits=Some(4))
(of class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2WriteCommand(DataSourceV2Plugin.scala:92)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:73)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:56)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)
at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
at scala.util.Try$.apply(Try.scala:213)
at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
... 29 more
Code:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
date_sub,
rand,
current_date
)
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger(__name__)
logger = logging.getLogger(__name__)
spark = (
SparkSession.builder.appName("test_dataframev2_spline")
.config("spark.sql.shuffle.partitions", "300")
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.driver.maxResultSize", "4g")
.config("spark.hadoop.fs.s3a.access.key", "access_key")
.config("spark.hadoop.fs.s3a.secret.key", "secret_key")
.enableHiveSupport()
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
target_table_name = "test.test_dataframev2"
df = (
spark.range(100)
.withColumn("date_id", date_sub(current_date(), (rand() * 3).cast("integer")))
.withColumn("rand_num", (rand() * 200 - 100).cast("integer"))
.select("date_id", "rand_num")
)
try:
df.writeTo(target_table_name).using("delta").partitionedBy("date_id").createOrReplace()
logger.info(f"method createOrReplace succedeed")
except Exception as e:
logger.info(f"error createOrReplace - {e}")
df.writeTo(target_table_name).using("delta").partitionedBy("date_id").overwritePartitions()Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels
Type
Projects
Status
New