Skip to content

Very long records cause Buffer overflow in BoundedPriorityQueue #24

@StrongestNumber9

Description

@StrongestNumber9

Describe the bug

Execution fails with the following error message on UI

Buffer overflow. Available: 0, required: 80893
Serialization trace:
underlying (org.apache.spark.util.BoundedPriorityQueue)

From yarn logs

Logical Plan:
Filter (index#120 RLIKE (?i)^lorem-200$ && (cast(_time#118 as string) >= from_unixtime(1405692376, yyyy-MM-dd HH:mm:ss, Some(Europe/Helsinki))))
+- StreamingExecutionRelation com.teragrep.pth06.ArchiveMicroBatchReader@55aa557, [_time#118, _raw#119, index#120, sourcetype#121, host#122, source#123, partition#124, offset#125L, origin#126]

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 10, <snip>, executor 16): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. A
vailable: 0, required: 80893
Serialization trace:
underlying (org.apache.spark.util.BoundedPriorityQueue). To avoid this, increase spark.kryoserializer.buffer.max value.
        at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:455)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 80893
Serialization trace:
underlying (org.apache.spark.util.BoundedPriorityQueue)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
        at com.esotericsoftware.kryo.io.Output.write(Output.java:214)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.write(UnsafeRow.java:678)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
        at com.twitter.chill.java.PriorityQueueSerializer.write(PriorityQueueSerializer.java:61)
        at com.twitter.chill.java.PriorityQueueSerializer.write(PriorityQueueSerializer.java:31)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
        at com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
        at com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
        at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
        ... 4 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2178)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1035)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1017)
        at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1439)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1426)
        at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
        at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2794)
        at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2793)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
        at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2793)
        at com.teragrep.functions.dpf_02.BatchCollect.collect(BatchCollect.java:125)
        at com.teragrep.pth10.ast.StepList.sendBatchEvent(StepList.java:275)
        at com.teragrep.pth10.ast.StepList.call(StepList.java:339)
        at com.teragrep.pth10.ast.StepList.call(StepList.java:70)
<snip>

Assuming this is dpf_02 issue due to the BatchCollect being the last part of our stacktrace

Expected behavior

Everything works as expected

How to reproduce

Query an index with very long records.
QA can use index="lorem-200" earliest=-10y

Software version
dpf_02 version: 2.7.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions