Conversation
wajda
left a comment
There was a problem hiding this comment.
@sethjones348 Thank you for your PR! We'll happily accept it, but there are a couple of things that needs to be polished. Please read the comments below.
| @@ -0,0 +1,58 @@ | |||
| package za.co.absa.spline.harvester.plugin.embedded | |||
There was a problem hiding this comment.
Missing copyright/license header. That's the reason why all test builds fail.
| val schema: String = cmd.options("sfSchema") | ||
| val table: String = cmd.options("dbtable") | ||
|
|
||
| WriteNodeInfo(asSourceId(url, warehouse, database, schema, table), cmd.mode, cmd.query, cmd.options) |
There was a problem hiding this comment.
This captures a WRITE operation, but what about a READ one?
Is DataSourceV2 supported by the Snowflake connector?
There was a problem hiding this comment.
I don't know have an explicit answer to your question. However, I did some digging on capturing READ, and I wasn't able to find a way to capture the data source URI given the underlying net.snowflake.spark.snowflake.SnowflakeRelation that gets matched on when the logical plan includes reading from a snowflake source.
This is what was available in that SnowflakeRelation object on a READ test I executed:
{
"schema": {
"fields": [
{
"name": "WORKORDERID",
"type": "StringType",
"nullable": true
},
{
"name": "PRODUCTID",
"type": "StringType",
"nullable": true
},
{
"name": "ORDERQTY",
"type": "StringType",
"nullable": true
},
{
"name": "SCRAPPEDQTY",
"type": "StringType",
"nullable": true
},
{
"name": "STARTDATE",
"type": "StringType",
"nullable": true
},
{
"name": "ENDDATE",
"type": "StringType",
"nullable": true
},
{
"name": "MODIFIEDDATE",
"type": "StringType",
"nullable": true
},
{
"name": "BONUS_PERCENT",
"type": "DoubleType",
"nullable": true
}
]
},
"scanMetrics": {
"numBytesRead": {
"id": 1,
"name": "number of bytes read",
"value": 7197975
},
"numRecordsRead": {
"id": 2,
"name": "number of records read",
"value": 72591
}
},
"writeMetrics": {
"numBytesWritten": {
"id": 3,
"name": "number of bytes written",
"value": 0
},
"numRecordsWritten": {
"id": 4,
"name": "number of records written",
"value": 0
}
},
"jdbcWrapper": "net.snowflake.spark.snowflake.DefaultJDBCWrapper$@2da464e2",
"params": "Snowflake Data Source",
"userSchema": "None",
"sqlContext": "org.apache.spark.sql.SQLContext@415f672d",
"log": "org.apache.logging.slf4j.Log4jLogger@10cc51c0",
"pushdownStatement": "SELECT * FROM ( workorder_data_gold ) AS \"SF_CONNECTOR_QUERY_ALIAS\"",
"bitmap": 7
}
The only data specific to the source table is that pushdownStatement which could be parsed to get the table name, but not enough for the datasource URI. With that in mind I decided to leave the READ capturing out of this PR.
There was a problem hiding this comment.
Hm, this is interesting:
"jdbcWrapper": "net.snowflake.spark.snowflake.DefaultJDBCWrapper$@2da464e2"
Is Snowflake connection just a wrapper over JDBC? Can you introspect that wrapper object? I believe there will be IPs, connection string etc.
Basically, what would be ideal to achieve (though not always possible unfortunately depending on the technology) is that the data source URI that comes out of the WRITE operation to match exactly the URL that comes out of the respective READ operation (the one that reads the logically same thing that was written by the write op).
The structure on that URL doesn't really matter as long as it resembles a URL and reliably identify the data entity that is being read or written. That helps Spline to connect execution plans based on the data source URL match, and build the end-to-end lineage view.
| <dependency> | ||
| <groupId>net.snowflake</groupId> | ||
| <artifactId>spark-snowflake_${scala.binary.version}</artifactId> | ||
| <version>2.16.0-spark_3.3</version> |
There was a problem hiding this comment.
The connector version 2.16.0-spark_3.3 specifically says it's compiled for the Spark 3.3. What about the other Spark versions? Will the plugin still be binary compatible?
| import za.co.absa.spline.harvester.builder.SourceIdentifier | ||
| import org.mockito.Mockito._ | ||
|
|
||
| class SnowflakePluginSpec extends AnyFlatSpec with Matchers with MockitoSugar { |
There was a problem hiding this comment.
Is there any possibility to add end-to-end integration tests to test compatibility with the real Spark?
Like, for example a Snowflake docker container or maybe official mock/test library?
133f2c1 to
4495e93
Compare
|
@wajda I implemented some of your suggestions. Turns out the read operation was not hard to capture after all! I also added an integration test, but I've marked the PR as draft as its not ready to merge yet. I have a couple of questions for ya.
I thought this was a little strange being that I see other tests where it is tagged as ignore if < spark 3.0 or > spark 3.3, which tells me there is a way to run the integration tests at a spark 3.3.x version. Any insight into why this is happening or just instructions on how to run an integration test at a specific spark version other than updating the
Running the databricks runtime associated with scala 2.12 and spark 3.3.x. Any insight would be greatly appreciated! Thanks! |
To run integration tests at a specific Spark version activate a corresponding $ mvn ... -Pspark-3.3
That's interesting, but it's difficult to tell without looking at detailed logs.
Did you check them? The ones for The builds for |
4495e93 to
03e5e6d
Compare
|



Overview
This PR adds support for snowflake target sources when using the
spline-spark-agentin conjunction with the snowflake spark connector.