Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,21 @@ Flink SQL> SELECT * FROM orders;
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>不能与 <code>scan.newly-added-table.enabled</code> 同时启用。<br>
<b><code>table-name</code>参数 匹配模式示例:</b>
<ul>
<li><code>db\.*</code> - 捕获数据库 'db' 下的所有表</li>
<li><code>db\.user_\.*</code> - 捕获数据库 'db' 下类似 'user_orders'、'user_profiles' 的表</li>
<li><code>db\.order_[0-9]+</code> - 捕获数据库 'db' 下类似 'order_1'、'order_2' 的表</li>
<li><code>db1\.*,db2\.user_\.*</code> - 捕获 'db1' 下所有表和 'db2' 下 'user_*' 开头的表</li>
</ul>
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
Expand Down
15 changes: 15 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,21 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Cannot be enabled together with <code>scan.newly-added-table.enabled</code>. <br>
<b><code>table-name</code> config Examples:</b>
<ul>
<li><code>db\.*</code> - captures all tables in database 'db'</li>
<li><code>db\.user_\.*</code> - captures tables like 'user_orders', 'user_profiles' in database 'db'</li>
<li><code>db\.order_[0-9]+</code> - captures tables like 'order_1', 'order_2' in database 'db'</li>
<li><code>db1\.*,db2\.user_\.*</code> - captures all tables in 'db1' and 'user_*' tables in 'db2'</li>
</ul>
</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
Expand Down Expand Up @@ -211,6 +212,36 @@ public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTa
return this;
}

/**
* Whether to capture newly added tables in binlog reading phase without snapshot. Cannot be
* enabled together with {@link #scanNewlyAddedTableEnabled(boolean)}.
*
* <p>The difference between {@link #scanNewlyAddedTableEnabled(boolean)} and this option:
*
* <ul>
* <li>scanNewlyAddedTableEnabled: performs full snapshot + incremental binlog reading for
* newly added tables when restored from checkpoint/savepoint
* <li>scanBinlogNewlyAddedTableEnabled: only captures binlog events for newly added tables
* during binlog reading phase, without snapshot
* </ul>
*
* <p>table-name pattern examples for the {@link #tableList(String...)} method:
*
* <ul>
* <li>"db\\.*" - captures all tables in database 'db'
* <li>"db\\.user_\\.*" - captures tables like 'user_orders', 'user_profiles' in database 'db'
* <li>"db\\.order_[0-9]+" - captures tables like 'order_1', 'order_2' in database 'db'
* <li>"db1\\.*", "db2\\.user_\\.*" - captures all tables in 'db1' and 'user_*' tables in
* 'db2'
* </ul>
*/
@Experimental
public MySqlSourceBuilder<T> scanBinlogNewlyAddedTableEnabled(
boolean scanBinlogNewlyAddedTableEnabled) {
this.configFactory.scanBinlogNewlyAddedTableEnabled(scanBinlogNewlyAddedTableEnabled);
return this;
}

/** Specifies the startup options. */
public MySqlSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
this.configFactory.startupOptions(startupOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ else if (!isRemainingTablesCheckpointed
}

private void captureNewlyAddedTables() {
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}

// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
&& !sourceConfig.getStartupOptions().isSnapshotOnly()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean includeHeartbeatEvents;
private final boolean includeTransactionMetadataEvents;
private final boolean scanNewlyAddedTableEnabled;
private final boolean scanBinlogNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
private final Map<ObjectPath, String> chunkKeyColumns;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class MySqlSourceConfig implements Serializable {
boolean includeHeartbeatEvents,
boolean includeTransactionMetadataEvents,
boolean scanNewlyAddedTableEnabled,
boolean scanBinlogNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties dbzProperties,
Properties jdbcProperties,
Expand Down Expand Up @@ -135,6 +137,7 @@ public class MySqlSourceConfig implements Serializable {
this.includeHeartbeatEvents = includeHeartbeatEvents;
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.scanBinlogNewlyAddedTableEnabled = scanBinlogNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
Expand Down Expand Up @@ -245,6 +248,10 @@ public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}

public boolean isScanBinlogNewlyAddedTableEnabled() {
return scanBinlogNewlyAddedTableEnabled;
}

public boolean isCloseIdleReaders() {
return closeIdleReaders;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.config.CommonConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Duration;
Expand All @@ -34,6 +36,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.source.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -43,6 +46,7 @@
public class MySqlSourceConfigFactory implements Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceConfigFactory.class);

private int port = 3306; // default 3306 port
private String hostname;
Expand All @@ -68,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean includeHeartbeatEvents = false;
private boolean includeTransactionMetadataEvents = false;
private boolean scanNewlyAddedTableEnabled = false;
private boolean scanBinlogNewlyAddedTableEnabled = false;
private boolean closeIdleReaders = false;
private Properties jdbcProperties;
private Duration heartbeatInterval = MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue();
Expand Down Expand Up @@ -258,6 +263,17 @@ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}

/**
* Whether to capture newly added tables in binlog reading phase without snapshot. This option
* can only be used with stream-only startup modes. Cannot be enabled together with {@link
* #scanNewlyAddedTableEnabled(boolean)}.
*/
public MySqlSourceConfigFactory scanBinlogNewlyAddedTableEnabled(
boolean scanBinlogNewlyAddedTableEnabled) {
this.scanBinlogNewlyAddedTableEnabled = scanBinlogNewlyAddedTableEnabled;
return this;
}

/** Custom properties that will overwrite the default JDBC connection URL. */
public MySqlSourceConfigFactory jdbcProperties(Properties jdbcProperties) {
this.jdbcProperties = jdbcProperties;
Expand Down Expand Up @@ -397,8 +413,26 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
if (databaseList != null) {
props.setProperty("database.include.list", String.join(",", databaseList));
}
// Validate: Two modes are mutually exclusive
if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
throw new IllegalArgumentException(
"Cannot enable both 'scan.binlog.newly-added-table.enabled' and "
+ "'scan.newly-added-table.enabled' as they may cause duplicate data");
}

if (tableList != null) {
props.setProperty("table.include.list", String.join(",", tableList));
// Convert table patterns to Debezium style if binlog auto-capture is enabled
if (scanBinlogNewlyAddedTableEnabled) {
String originalPattern = String.join(",", tableList);
String debeziumPattern = convertToDebeziumStyle(originalPattern);
props.setProperty("table.include.list", debeziumPattern);
LOG.info(
"Converted table pattern from '{}' to Debezium style '{}' for binlog auto-capture",
originalPattern,
debeziumPattern);
} else {
props.setProperty("table.include.list", String.join(",", tableList));
}
}
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
Expand Down Expand Up @@ -436,6 +470,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
includeHeartbeatEvents,
includeTransactionMetadataEvents,
scanNewlyAddedTableEnabled,
scanBinlogNewlyAddedTableEnabled,
closeIdleReaders,
props,
jdbcProperties,
Expand All @@ -446,4 +481,46 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
useLegacyJsonFormat,
assignUnboundedChunkFirst);
}

/**
* Convert Flink CDC style table pattern to Debezium style.
*
* <p>In CDC-style table matching, table names are separated by commas and use `\.` for regex
* matching. In Debezium style, table names are separated by pipes and use `.` for regex
* matching while `\.` is used as database.table separator.
*
* <p>Examples:
*
* <ul>
* <li>{@code "db1.table_\.*,db2.user_\.*"} -> {@code "db1\.table_.*|db2\.user_.*"}
* <li>{@code "test_db.orders"} -> {@code "test_db\.orders"}
* </ul>
*
* @param tables Flink CDC style table pattern
* @return Debezium style table pattern
*/
private static String convertToDebeziumStyle(String tables) {
LOG.debug("Converting table pattern to Debezium style: {}", tables);

// Step 1: Replace comma separator with pipe (OR semantics)
tables =
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
LOG.debug("After replacing comma with pipe separator: {}", tables);

// Step 2: Replace escaped dot \. with placeholder
// In Flink CDC, \. means any character in regex, in Debezium it should be .
String unescapedTables = tables.replace("\\.", "$");
LOG.debug("After unescaping dots as RegEx meta-character: {}", unescapedTables);

// Step 3: Replace unescaped dot . with \.
// In Flink CDC, unescaped . is database.table separator, in Debezium it should be \.
String escapedTables = unescapedTables.replace(".", "\\.");
LOG.debug("After escaping dots as separator: {}", escapedTables);

// Step 4: Restore placeholder to regular dot .
String debeziumStyle = escapedTables.replace("$", ".");
LOG.debug("Final Debezium-style table pattern: {}", debeziumStyle);

return debeziumStyle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,22 @@ public class MySqlSourceOptions {
.withDescription(
"Whether capture the scan the newly added tables or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.");

@Experimental
public static final ConfigOption<Boolean> SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.binlog.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"In binlog reading stage, whether to capture newly added tables "
+ "that match the table patterns. When enabled, new tables will be "
+ "captured without snapshot, only binlog events will be emitted. "
+ "Cannot be enabled together with 'scan.newly-added-table.enabled'. "
+ "table-name pattern examples: "
+ "'db\\.*' (all tables in database 'db'), "
+ "'db\\.user_\\.*' (tables like 'user_orders', 'user_profiles'), "
+ "'db\\.order_[0-9]+' (tables like 'order_1', 'order_2'), "
+ "'db1\\.*,db2\\.user_\\.*' (all tables in 'db1' and 'user_*' tables in 'db2').");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean scanNewlyAddedTableEnabled;
private final boolean scanBinlogNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
Expand Down Expand Up @@ -136,6 +137,7 @@ public MySqlTableSource(
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
boolean scanBinlogNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties jdbcProperties,
Duration heartbeatInterval,
Expand Down Expand Up @@ -166,6 +168,7 @@ public MySqlTableSource(
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.scanBinlogNewlyAddedTableEnabled = scanBinlogNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.jdbcProperties = jdbcProperties;
this.parseOnlineSchemaChanges = parseOnlineSchemaChanges;
Expand Down Expand Up @@ -233,6 +236,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.scanBinlogNewlyAddedTableEnabled(scanBinlogNewlyAddedTableEnabled)
.closeIdleReaders(closeIdleReaders)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
Expand Down Expand Up @@ -322,6 +326,7 @@ public DynamicTableSource copy() {
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
scanBinlogNewlyAddedTableEnabled,
closeIdleReaders,
jdbcProperties,
heartbeatInterval,
Expand Down Expand Up @@ -353,6 +358,7 @@ public boolean equals(Object o) {
&& distributionFactorUpper == that.distributionFactorUpper
&& distributionFactorLower == that.distributionFactorLower
&& scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled
&& scanBinlogNewlyAddedTableEnabled == that.scanBinlogNewlyAddedTableEnabled
&& closeIdleReaders == that.closeIdleReaders
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
Expand Down Expand Up @@ -405,6 +411,7 @@ public int hashCode() {
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled,
scanBinlogNewlyAddedTableEnabled,
closeIdleReaders,
jdbcProperties,
heartbeatInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
config.get(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean scanNewlyAddedTableEnabled =
config.get(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(MySqlSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
Duration heartbeatInterval = config.get(MySqlSourceOptions.HEARTBEAT_INTERVAL);
String chunkKeyColumn =
config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN)
Expand Down Expand Up @@ -148,6 +150,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
scanBinlogNewlyAddedTableEnabled,
closeIdleReaders,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
Expand Down Expand Up @@ -198,6 +201,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(MySqlSourceOptions.CONNECT_MAX_RETRIES);
options.add(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(MySqlSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
Expand Down
Loading