Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8d49e2f
Added initial implementation for writing lineage messages using HDFS …
rkrumins Oct 12, 2025
a35a99c
Added creation of the base customLineagePath directory ensuring linea…
rkrumins Oct 12, 2025
7a824bf
Updated documentation for Hdfs Dispatcher in spline.default.yaml file
rkrumins Oct 12, 2025
96373b4
Added initial integration tests for HDFS Lineage Dispatcher with cent…
rkrumins Oct 12, 2025
2b01b81
Aliging tests to be running same evaluation as when using default mode
rkrumins Oct 12, 2025
6f0ecdf
Minor cleanup in HDFSLineageDispatcher
rkrumins Oct 12, 2025
b064041
Fixed issue with unmatched types for resolveLineagePath in HDFSLineag…
rkrumins Oct 12, 2025
578c876
Fixing issues as per SonarQube for HDFSLineageDispatcher
rkrumins Oct 12, 2025
bebf4c9
Fix for the issue in fsScheme!
rkrumins Oct 12, 2025
04eebe3
Constant for file extension in HDFSLineageDispatcherSpec
rkrumins Oct 12, 2025
5f63f61
Removed outputSource filename when writing file to new location and u…
rkrumins Oct 12, 2025
8bb78a1
Fixing issues as per static code analysis
rkrumins Oct 13, 2025
7a93705
Fixing issues in logic for HDFSLineageDispatcher
rkrumins Oct 13, 2025
32db7ac
Fixing issue with exception handling for mkdirs in HDFSLineageDispatcher
rkrumins Oct 13, 2025
30f2f40
Updated integration test for custom lineage path in HDFSLineageDispat…
rkrumins Oct 13, 2025
87936b8
Ensuring the edgecase with Spark AppName containing non-standard char…
rkrumins Oct 13, 2025
4e699dd
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
8a4b431
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
6bc068d
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
d155a98
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
4c7d723
Fixed integration test and changed the filename to avoid clashed from…
rkrumins Oct 13, 2025
cd535c9
Added more robust check to ensure no _LINEAGE file is created
rkrumins Oct 13, 2025
0e5c69a
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
bebc869
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
7bd5b0a
Updated spline.default.yaml as per up-to-date details for HDFSLineage…
rkrumins Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removed outputSource filename when writing file to new location and u…
…sing internal SparkContext values
  • Loading branch information
rkrumins committed Oct 12, 2025
commit 5f63f61e8032c408615819cb36e4657b83cb2aa6
15 changes: 8 additions & 7 deletions core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,23 @@ spline:
# If left empty, null, or not specified → DEFAULT MODE: lineage written alongside target data files
# If set to a path → CENTRALIZED MODE: all lineage written to this location with unique filenames
#
# CENTRALIZED MODE filename format: {timestamp}_{fileName}_{appId}
# CENTRALIZED MODE filename format: {timestamp}_{appName}_{appId}
# - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for natural chronological sorting and easy filtering
# Example: 2025-10-12_14-30-45-123
# - fileName: The configured fileName value (e.g., "my_file.parq_LINEAGE")
# - appName: Spark application name for easy identification of which job generated the lineage
# Example: MySparkJob
# - appId: Spark application ID for traceability to specific runs
# Example: app-20251012143045-0001
#
# More examples:
# Examples (assuming app name is "MySparkJob"):
# - Local: customLineagePath: /my/centralized/lineage
# Output: /my/centralized/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# Output: /my/centralized/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - S3: customLineagePath: s3://my-bucket/lineage
# Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - GCS: customLineagePath: gs://my-bucket/lineage
# Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - HDFS: customLineagePath: hdfs://cluster/lineage
# Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# -------------------------------------------
# Open Lineage HTTP dispatcher
# -------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ import scala.concurrent.blocking
*
* 2. CENTRALIZED MODE (customLineagePath set to a valid path):
* All lineage files are written to a single centralized location with unique filenames.
* Filename format: {timestamp}_{fileName}_{appId}
* Filename format: {timestamp}_{appName}_{appId}
* - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for chronological sorting and filtering
* - fileName: The configured fileName value (e.g., "my_file.parq_LINEAGE")
* - appName: Spark application name for easy identification
* - appId: Spark application ID for traceability
*
* The timestamp-first format ensures natural chronological sorting and easy date-based filtering.
Expand Down Expand Up @@ -87,7 +87,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID")

try {
val path = resolveLineagePath(event.planId.toString)
val path = resolveLineagePath()
val planWithEvent = Map(
"executionPlan" -> this._lastSeenPlan,
"executionEvent" -> event
Expand All @@ -108,6 +108,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
* @return The full path where the lineage file should be written
*/
private def resolveLineagePath(): String = {
val outputSource = s"${this._lastSeenPlan.operations.write.outputSource}"
customLineagePath match {
case Some(customPath) =>
// Centralized mode: write to custom path with unique filename
Expand All @@ -116,28 +117,30 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
s"$cleanCustomPath/$uniqueFilename"
case None =>
// Default mode: write alongside target data file
s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename"
s"${outputSource.stripSuffix("/")}/$filename"
}
}

/**
* Generates a unique filename for centralized lineage storage.
*
* Format: {timestamp}_{fileName}_{appId}
* Example: 2025-10-12_14-30-45-123_lineage_app-20251012143045-0001
* Format: {timestamp}_{appName}_{appId}
* Example: 2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
*
* This format optimizes for operational debugging use cases:
* - Timestamp FIRST: Ensures natural chronological sorting (most recent files appear together)
* - Application Name: Easy identification of which job generated the lineage
* - Application ID: Full traceability to specific Spark application run
*
* @return A unique filename optimized for filtering and sorting
*/
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${filename}_${appId}"
s"${timestamp}_${appName}_${appId}"
}
Comment on lines +147 to +156
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sanitize appName and appId to prevent filesystem path issues.

The Spark application name can contain spaces, slashes, or special characters that may cause filesystem path issues. While the filename format is well-designed for sorting, unsanitized names can break path construction.

Apply this diff to sanitize the application metadata:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
-    val appName = sparkContext.appName
-    val appId = sparkContext.applicationId
+    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${appName}_${appId}"
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
val dateFormatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS")
.withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
🤖 Prompt for AI Agents
In
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala
around lines 137 to 144, the generated filename uses raw sparkContext.appName
and sparkContext.applicationId which may contain spaces, slashes or special
characters that break filesystem paths; sanitize both values before composing
the filename by replacing or removing unsafe characters (e.g., replace
non-alphanumeric, dash, underscore characters with underscore), trim length if
necessary, and then build the filename using the sanitized appName and appId so
the resulting string is safe for use in HDFS paths.


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HDFSLineageDispatcherSpec
val lineageDispatcherConfigCustomLineagePathKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.customLineagePath"
val destFilePathExtension = ".parquet"

it should "save lineage file to a filesystem in DEFAULT mode (no customLineagePath)" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
it should "save lineage file to a filesystem in DEFAULT mode" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
withIsolatedSparkSession(_
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
Expand Down Expand Up @@ -98,10 +98,10 @@ class HDFSLineageDispatcherSpec
val lineageFile = lineageFiles(0)
lineageFile.length should be > 0L

// Verify filename format: {timestamp}_{fileName}_{appId}
// Verify filename format: {timestamp}_{appName}_{appId}
val filename = lineageFile.getName
// Should match pattern: yyyy-MM-dd_HH-mm-ss-SSS__LINEAGE_app-...
filename should include("_LINEAGE_")
// Should match pattern: yyyy-MM-dd_HH-mm-ss-SSS_{appName}_app-...
// AppName and AppId are part of the filename
filename should startWith regex """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}"""

val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
Expand Down