-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-39001][doc][Flink-source]supple NewlyAddTable's doc with mongodb,postgres,oracle connectors #4247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[FLINK-39001][doc][Flink-source]supple NewlyAddTable's doc with mongodb,postgres,oracle connectors #4247
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all data changes on a single | |||||
| By the way, Debezium's MongoDB change streams exploration mentioned by [DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> | ||||||
| If it's done, we can consider integrating two kinds of source connector for users to choose. | ||||||
|
|
||||||
| ### Scan Newly Added Tables | ||||||
|
|
||||||
| **Note:** This feature is available since Flink CDC 3.1.0. | ||||||
|
|
||||||
| The Scan Newly Added Tables feature enables you to add new collections to monitor for existing running pipeline. The newly added collections will read their snapshot data firstly and then read their change stream automatically. | ||||||
|
|
||||||
| Imagine this scenario: At the beginning, a Flink job monitors collections `[product, user, address]`, but after some days we would like the job can also monitor collections `[order, custom]` which contain history data, and we need the job can still reuse existing state of the job. This feature can resolve this case gracefully. | ||||||
|
|
||||||
| The following operations show how to enable this feature to resolve above scenario. An existing Flink job which uses MongoDB CDC Source like: | ||||||
|
|
||||||
| ```java | ||||||
| MongoDBSource<String> mongoSource = MongoDBSource.<String>builder() | ||||||
| .hosts("yourHostname:27017") | ||||||
| .databaseList("db") // set captured database | ||||||
| .collectionList("db.product", "db.user", "db.address") // set captured collections | ||||||
| .username("yourUsername") | ||||||
| .password("yourPassword") | ||||||
| .scanNewlyAddedTableEnabled(true) // enable scan the newly added tables feature | ||||||
| .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||||||
| .build(); | ||||||
| // your business code | ||||||
| ``` | ||||||
|
|
||||||
| If we would like to add new collections `[order, custom]` to an existing Flink job, we just need to update the `collectionList()` value of the job to include `[order, custom]` and restore the job from previous savepoint. | ||||||
|
|
||||||
| _Step 1_: Stop the existing Flink job with savepoint. | ||||||
| ```shell | ||||||
| $ ./bin/flink stop $Existing_Flink_JOB_ID | ||||||
| ``` | ||||||
| ```shell | ||||||
| Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint. | ||||||
| Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab | ||||||
| ``` | ||||||
| _Step 2_: Update the collection list option for the existing Flink job. | ||||||
| 1. update `collectionList()` value. | ||||||
| 2. build the jar of updated job. | ||||||
| ```java | ||||||
| MongoDBSource<String> mongoSource = MongoDBSource.<String>builder() | ||||||
| .hosts("yourHostname:27017") | ||||||
| .databaseList("db") | ||||||
| .collectionList("db.product", "db.user", "db.address", "db.order", "db.custom") // set captured collections [product, user, address, order, custom] | ||||||
| .username("yourUsername") | ||||||
| .password("yourPassword") | ||||||
| .scanNewlyAddedTableEnabled(true) // enable scan newly added tables feature | ||||||
| .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||||||
| .build(); | ||||||
| // your business code | ||||||
| ``` | ||||||
| _Step 3_: Restore the updated Flink job from savepoint. | ||||||
| ```shell | ||||||
| $ ./bin/flink run \ | ||||||
| --detached \ | ||||||
| --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ | ||||||
| ./FlinkCDCExample.jar | ||||||
| ``` | ||||||
| **Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. | ||||||
|
||||||
| **Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. | |
| **Note:** Please refer to the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh | |||||
|
|
||||||
| The Oracle CDC source can't work in parallel reading, because there is only one task can receive change events. | ||||||
|
|
||||||
| ### Scan Newly Added Tables | ||||||
|
|
||||||
| **Note:** This feature is available since Flink CDC 3.1.0. | ||||||
|
|
||||||
| Scan Newly Added Tables feature enables you to add new tables to monitor for an existing running pipeline. The newly added tables will read their snapshot data first and then read their redo log automatically. | ||||||
|
|
||||||
| Imagine this scenario: At the beginning, a Flink job monitors tables `[product, user, address]`, but after some days we would like the job can also monitor tables `[order, custom]` which contain history data, and we need the job can still reuse existing state of the job. This feature can resolve this case gracefully. | ||||||
|
|
||||||
| The following operations show how to enable this feature to resolve above scenario. An existing Flink job which uses Oracle CDC Source like: | ||||||
|
|
||||||
| ```java | ||||||
| JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder() | ||||||
| .hostname("yourHostname") | ||||||
| .port(1521) | ||||||
| .databaseList("ORCLCDB") // set captured database | ||||||
| .schemaList("INVENTORY") // set captured schema | ||||||
| .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS") // set captured tables | ||||||
| .username("yourUsername") | ||||||
| .password("yourPassword") | ||||||
| .scanNewlyAddedTableEnabled(true) // enable scan newly added tables feature | ||||||
| .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||||||
| .build(); | ||||||
| // your business code | ||||||
| ``` | ||||||
|
|
||||||
| If we would like to add new tables `[INVENTORY.ORDER, INVENTORY.CUSTOM]` to an existing Flink job, we just need to update the `tableList()` value of the job to include `[INVENTORY.ORDER, INVENTORY.CUSTOM]` and restore the job from previous savepoint. | ||||||
|
|
||||||
| _Step 1_: Stop the existing Flink job with savepoint. | ||||||
| ```shell | ||||||
| $ ./bin/flink stop $Existing_Flink_JOB_ID | ||||||
| ``` | ||||||
| ```shell | ||||||
| Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint. | ||||||
| Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab | ||||||
| ``` | ||||||
| _Step 2_: Update the table list option for the existing Flink job. | ||||||
| 1. update `tableList()` value. | ||||||
| 2. build the jar of updated job. | ||||||
| ```java | ||||||
| JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder() | ||||||
| .hostname("yourHostname") | ||||||
| .port(1521) | ||||||
| .databaseList("ORCLCDB") | ||||||
| .schemaList("INVENTORY") | ||||||
| .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS", "INVENTORY.ORDER", "INVENTORY.CUSTOM") // set captured tables [PRODUCT, USER, ADDRESS, ORDER, CUSTOM] | ||||||
| .username("yourUsername") | ||||||
| .password("yourPassword") | ||||||
| .scanNewlyAddedTableEnabled(true) | ||||||
| .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||||||
| .build(); | ||||||
| // your business code | ||||||
| ``` | ||||||
| _Step 3_: Restore the updated Flink job from savepoint. | ||||||
| ```shell | ||||||
| $ ./bin/flink run \ | ||||||
| --detached \ | ||||||
| --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ | ||||||
| ./FlinkCDCExample.jar | ||||||
| ``` | ||||||
| **Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. | ||||||
|
||||||
| **Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. | |
| **Note:** Please refer to the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment refers to "scan the newly added tables feature" but should be "scan newly added tables feature" (remove the article "the" before "newly") to be consistent with the section title and standard English usage.