Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ airbyteBulkConnector {

dependencies {
implementation 'com.azure:azure-identity:1.17.0'
// version specified by the debezium bom dependency in the cdc toolkit

api platform('io.debezium:debezium-bom:3.4.1.Final')

implementation 'io.debezium:debezium-connector-postgres'
// Use Postgres JDBC driver version specified by Debezium
implementation 'org.postgresql:postgresql'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PostgresSourceMetadataQuerier(
override fun extraChecks() {
base.extraChecks()
if (postgresSourceConfig.incrementalConfiguration is XminIncrementalConfiguration) {
base.conn.use { conn ->
base.conn.use {
if (dbNumWraparound(base.conn) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little weird. Can we use conn or it on line 31 instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️
Fixed

throw ConfigErrorException(xminWraparoundError)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PostgresSourceDebeziumOperations(
check(stateNode.size() == 1) { "State value has unexpected format: $opaqueStateValue" }
val offsetMap: Map<JsonNode, JsonNode> =
stateNode
.fields()
.properties()
.asSequence()
.map { (k, v) -> Jsons.readTree(k) to Jsons.readTree(v.textValue()) }
.toMap()
Expand Down
Loading