Skip to content
Open
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8d49e2f
Added initial implementation for writing lineage messages using HDFS …
rkrumins Oct 12, 2025
a35a99c
Added creation of the base customLineagePath directory ensuring linea…
rkrumins Oct 12, 2025
7a824bf
Updated documentation for Hdfs Dispatcher in spline.default.yaml file
rkrumins Oct 12, 2025
96373b4
Added initial integration tests for HDFS Lineage Dispatcher with cent…
rkrumins Oct 12, 2025
2b01b81
Aliging tests to be running same evaluation as when using default mode
rkrumins Oct 12, 2025
6f0ecdf
Minor cleanup in HDFSLineageDispatcher
rkrumins Oct 12, 2025
b064041
Fixed issue with unmatched types for resolveLineagePath in HDFSLineag…
rkrumins Oct 12, 2025
578c876
Fixing issues as per SonarQube for HDFSLineageDispatcher
rkrumins Oct 12, 2025
bebf4c9
Fix for the issue in fsScheme!
rkrumins Oct 12, 2025
04eebe3
Constant for file extension in HDFSLineageDispatcherSpec
rkrumins Oct 12, 2025
5f63f61
Removed outputSource filename when writing file to new location and u…
rkrumins Oct 12, 2025
8bb78a1
Fixing issues as per static code analysis
rkrumins Oct 13, 2025
7a93705
Fixing issues in logic for HDFSLineageDispatcher
rkrumins Oct 13, 2025
32db7ac
Fixing issue with exception handling for mkdirs in HDFSLineageDispatcher
rkrumins Oct 13, 2025
30f2f40
Updated integration test for custom lineage path in HDFSLineageDispat…
rkrumins Oct 13, 2025
87936b8
Ensuring the edgecase with Spark AppName containing non-standard char…
rkrumins Oct 13, 2025
4e699dd
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
8a4b431
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
6bc068d
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
d155a98
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
4c7d723
Fixed integration test and changed the filename to avoid clashed from…
rkrumins Oct 13, 2025
cd535c9
Added more robust check to ensure no _LINEAGE file is created
rkrumins Oct 13, 2025
0e5c69a
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
bebc869
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
7bd5b0a
Updated spline.default.yaml as per up-to-date details for HDFSLineage…
rkrumins Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added getOrElse when obtaining planId in HDFSLineageDispatcher
  • Loading branch information
rkrumins committed Oct 13, 2025
commit 0e5c69a3842bcf26daf08f808aeb6603e6c87bc1
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,24 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
override def name = "HDFS"

override def send(plan: ExecutionPlan): Unit = {
require(plan != null, "Execution plan cannot be null")
require(plan.id.isDefined, "Execution plan must have an ID")
this._lastSeenPlan = plan
}

override def send(event: ExecutionEvent): Unit = {
// check state
if (this._lastSeenPlan == null || this._lastSeenPlan.id.get != event.planId)
throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID")
if (this._lastSeenPlan == null) {
throw new IllegalStateException("send(event) must be called strictly after send(plan) method")
}

val planId = this._lastSeenPlan.id.getOrElse(
throw new IllegalStateException("Execution plan ID is missing")
)

if (planId != event.planId) {
throw new IllegalStateException(s"Plan ID mismatch: expected $planId but event has ${event.planId}")
}

try {
val path = resolveLineagePath()
Expand Down Expand Up @@ -146,7 +157,9 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
*/
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val planId = this._lastSeenPlan.id.get.toString
val planId = this._lastSeenPlan.id.getOrElse(
throw new IllegalStateException("Execution plan ID is missing")
).toString
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
Expand Down