Skip to content

Commit ca89743

Browse files
committed
[FLINK-38839][runtime] Support custom delimiter for table-options
This commit extends the table-options feature to support custom delimiters, making it more flexible and powerful for users. Changes: - Add optional 'table-options.delimiter' configuration parameter - Default delimiter is ',' for backward compatibility - Support any custom delimiter (e.g., ';', '|', '$', etc.) - Update TransformDef, SchemaMetadataTransform, and TransformRule classes - Update YAML parser to handle the new configuration - Add test cases for custom delimiter functionality - Update documentation with usage examples Example usage: transform:   - source-table: mydb.mytable     table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh     table-options.delimiter: ";"
1 parent 097aa9a commit ca89743

File tree

11 files changed

+223
-11
lines changed

11 files changed

+223
-11
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ under the License.
3939
| primary-keys | Sink table primary keys, separated by commas | optional |
4040
| partition-keys | Sink table partition keys, separated by commas | optional |
4141
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| table-options.delimiter | delimiter for table-options key-value pairs, default is `,` | optional |
4243
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
4344
| description | Transform rule description | optional |
4445

@@ -311,7 +312,13 @@ transform:
311312
table-options: comment=web order
312313
description: auto creating table options example
313314
```
314-
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
315+
小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号或其他特殊字符,可以使用 `table-options.delimiter` 指定自定义分隔符(如 `;`、`|`、`$` 等):
316+
```yaml
317+
transform:
318+
- source-table: mydb.web_order
319+
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
320+
table-options.delimiter: ";"
321+
```
315322

316323
## Classification mapping
317324
多个转换规则可以定义为分类映射。
@@ -466,4 +473,4 @@ pipeline:
466473
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
467474
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
468475
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
469-
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
476+
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |

docs/content/docs/core-concept/transform.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ To describe a transform rule, the following parameters can be used:
3939
| primary-keys | Sink table primary keys, separated by commas | optional |
4040
| partition-keys | Sink table partition keys, separated by commas | optional |
4141
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| table-options.delimiter | delimiter for table-options key-value pairs, default is `,` | optional |
4243
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
4344
| description | Transform rule description | optional |
4445

@@ -315,6 +316,13 @@ transform:
315316
description: auto creating table options example
316317
```
317318
Tips: The format of table-options is `key1=value1,key2=value2`.
319+
If option values contain commas or other special characters, you can specify a custom delimiter using `table-options.delimiter` (such as `;`, `|`, `$`, etc.):
320+
```yaml
321+
transform:
322+
- source-table: mydb.web_order
323+
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
324+
table-options.delimiter: ";"
325+
```
318326

319327
## Classification mapping
320328
Multiple transform rules can be defined to classify input data rows and apply different processing.
@@ -471,4 +479,4 @@ The following built-in models are provided:
471479
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
472480
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
473481
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
474-
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
482+
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
104104

105105
public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";
106106

107+
public static final String TRANSFORM_TABLE_OPTION_DELIMITER_KEY = "table-options.delimiter";
108+
107109
private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
108110

109111
/** Parse the specified pipeline definition file. */
@@ -324,6 +326,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
324326
TRANSFORM_PRIMARY_KEY_KEY,
325327
TRANSFORM_PARTITION_KEY_KEY,
326328
TRANSFORM_TABLE_OPTION_KEY,
329+
TRANSFORM_TABLE_OPTION_DELIMITER_KEY,
327330
TRANSFORM_DESCRIPTION_KEY,
328331
TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
329332

@@ -357,6 +360,10 @@ private TransformDef toTransformDef(JsonNode transformNode) {
357360
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY))
358361
.map(JsonNode::asText)
359362
.orElse(null);
363+
String tableOptionsDelimiter =
364+
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_DELIMITER_KEY))
365+
.map(JsonNode::asText)
366+
.orElse(null);
360367
String description =
361368
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
362369
.map(JsonNode::asText)
@@ -373,6 +380,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
373380
primaryKeys,
374381
partitionKeys,
375382
tableOptions,
383+
tableOptionsDelimiter,
376384
description,
377385
postTransformConverter);
378386
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
* by `,`. Optional for the definition.
3737
* <li>tableOptions: a string for table options for matching input table IDs, options are
3838
* seperated by `,`, key and value are seperated by `=`. Optional for the definition.
39+
* <li>tableOptionsDelimiter: a string for delimiter of table options, default is `,`. Optional
40+
* for the definition.
3941
* <li>description: description for the transformation. Optional for the definition.
4042
* </ul>
4143
*/
@@ -47,6 +49,7 @@ public class TransformDef {
4749
private final String primaryKeys;
4850
private final String partitionKeys;
4951
private final String tableOptions;
52+
private final String tableOptionsDelimiter;
5053
private final String postTransformConverter;
5154

5255
public TransformDef(
@@ -56,6 +59,7 @@ public TransformDef(
5659
String primaryKeys,
5760
String partitionKeys,
5861
String tableOptions,
62+
String tableOptionsDelimiter,
5963
String description,
6064
String postTransformConverter) {
6165
this.sourceTable = sourceTable;
@@ -64,10 +68,30 @@ public TransformDef(
6468
this.primaryKeys = primaryKeys;
6569
this.partitionKeys = partitionKeys;
6670
this.tableOptions = tableOptions;
71+
this.tableOptionsDelimiter = tableOptionsDelimiter;
6772
this.description = description;
6873
this.postTransformConverter = postTransformConverter;
6974
}
70-
75+
public TransformDef(
76+
String sourceTable,
77+
String projection,
78+
String filter,
79+
String primaryKeys,
80+
String partitionKeys,
81+
String tableOptions,
82+
String description,
83+
String postTransformConverter) {
84+
this(
85+
sourceTable,
86+
projection,
87+
filter,
88+
primaryKeys,
89+
partitionKeys,
90+
tableOptions,
91+
",",
92+
description,
93+
postTransformConverter);
94+
}
7195
public String getSourceTable() {
7296
return sourceTable;
7397
}
@@ -96,6 +120,10 @@ public String getTableOptions() {
96120
return tableOptions;
97121
}
98122

123+
public String getTableOptionsDelimiter() {
124+
return tableOptionsDelimiter;
125+
}
126+
99127
public String getPostTransformConverter() {
100128
return postTransformConverter;
101129
}
@@ -137,6 +165,7 @@ public boolean equals(Object o) {
137165
&& Objects.equals(primaryKeys, that.primaryKeys)
138166
&& Objects.equals(partitionKeys, that.partitionKeys)
139167
&& Objects.equals(tableOptions, that.tableOptions)
168+
&& Objects.equals(tableOptionsDelimiter, that.tableOptionsDelimiter)
140169
&& Objects.equals(postTransformConverter, that.postTransformConverter);
141170
}
142171

@@ -150,6 +179,7 @@ public int hashCode() {
150179
primaryKeys,
151180
partitionKeys,
152181
tableOptions,
182+
tableOptionsDelimiter,
153183
postTransformConverter);
154184
}
155185
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private PreTransformOperator generatePreTransform(
7575
transform.getPrimaryKeys(),
7676
transform.getPartitionKeys(),
7777
transform.getTableOptions(),
78+
transform.getTableOptionsDelimiter(),
7879
transform.getPostTransformConverter(),
7980
supportedMetadataColumns);
8081
}
@@ -112,6 +113,7 @@ public DataStream<Event> translatePostTransform(
112113
transform.getPrimaryKeys(),
113114
transform.getPartitionKeys(),
114115
transform.getTableOptions(),
116+
transform.getTableOptionsDelimiter(),
115117
transform.getPostTransformConverter(),
116118
supportedMetadataColumns);
117119
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,28 @@ public PostTransformOperatorBuilder addTransform(
4444
String tableOptions,
4545
String postTransformConverter,
4646
SupportedMetadataColumn[] supportedMetadataColumns) {
47+
return addTransform(
48+
tableInclusions,
49+
projection,
50+
filter,
51+
primaryKey,
52+
partitionKey,
53+
tableOptions,
54+
null,
55+
postTransformConverter,
56+
supportedMetadataColumns);
57+
}
58+
59+
public PostTransformOperatorBuilder addTransform(
60+
String tableInclusions,
61+
@Nullable String projection,
62+
@Nullable String filter,
63+
String primaryKey,
64+
String partitionKey,
65+
String tableOptions,
66+
String tableOptionsDelimiter,
67+
String postTransformConverter,
68+
SupportedMetadataColumn[] supportedMetadataColumns) {
4769
transformRules.add(
4870
new TransformRule(
4971
tableInclusions,
@@ -52,6 +74,7 @@ public PostTransformOperatorBuilder addTransform(
5274
primaryKey,
5375
partitionKey,
5476
tableOptions,
77+
tableOptionsDelimiter,
5578
postTransformConverter,
5679
supportedMetadataColumns));
5780
return this;
@@ -67,6 +90,7 @@ public PostTransformOperatorBuilder addTransform(
6790
"",
6891
"",
6992
"",
93+
"",
7094
null,
7195
new SupportedMetadataColumn[0]));
7296
return this;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void setup(
110110
String primaryKeys = transformRule.getPrimaryKey();
111111
String partitionKeys = transformRule.getPartitionKey();
112112
String tableOptions = transformRule.getTableOption();
113+
String tableOptionsDelimiter = transformRule.getTableOptionsDelimiter();
113114
Selectors selectors =
114115
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
115116
transforms.add(
@@ -120,7 +121,8 @@ public void setup(
120121
schemaMetadataTransformers.add(
121122
new Tuple2<>(
122123
selectors,
123-
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
124+
new SchemaMetadataTransform(
125+
primaryKeys, partitionKeys, tableOptions, tableOptionsDelimiter)));
124126
}
125127
this.preTransformProcessorMap = new ConcurrentHashMap<>();
126128
this.hasAsteriskMap = new ConcurrentHashMap<>();

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public PreTransformOperatorBuilder addTransform(
4343
"",
4444
"",
4545
"",
46+
"",
4647
null,
4748
new SupportedMetadataColumn[0]));
4849
return this;
@@ -57,6 +58,28 @@ public PreTransformOperatorBuilder addTransform(
5758
String tableOption,
5859
@Nullable String postTransformConverter,
5960
SupportedMetadataColumn[] supportedMetadataColumns) {
61+
return addTransform(
62+
tableInclusions,
63+
projection,
64+
filter,
65+
primaryKey,
66+
partitionKey,
67+
tableOption,
68+
null,
69+
postTransformConverter,
70+
supportedMetadataColumns);
71+
}
72+
73+
public PreTransformOperatorBuilder addTransform(
74+
String tableInclusions,
75+
@Nullable String projection,
76+
@Nullable String filter,
77+
String primaryKey,
78+
String partitionKey,
79+
String tableOption,
80+
String tableOptionsDelimiter,
81+
@Nullable String postTransformConverter,
82+
SupportedMetadataColumn[] supportedMetadataColumns) {
6083
transformRules.add(
6184
new TransformRule(
6285
tableInclusions,
@@ -65,6 +88,7 @@ public PreTransformOperatorBuilder addTransform(
6588
primaryKey,
6689
partitionKey,
6790
tableOption,
91+
tableOptionsDelimiter,
6892
postTransformConverter,
6993
supportedMetadataColumns));
7094
return this;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ public class SchemaMetadataTransform implements Serializable {
4242
private Map<String, String> options = new HashMap<>();
4343

4444
public SchemaMetadataTransform(
45-
String primaryKeyString, String partitionKeyString, String tableOptionString) {
45+
String primaryKeyString,
46+
String partitionKeyString,
47+
String tableOptionString,
48+
String tableOptionsDelimiter) {
4649
if (!StringUtils.isNullOrWhitespaceOnly(primaryKeyString)) {
4750
String[] primaryKeyArr = primaryKeyString.split(",");
4851
for (int i = 0; i < primaryKeyArr.length; i++) {
@@ -58,13 +61,19 @@ public SchemaMetadataTransform(
5861
partitionKeys = Arrays.asList(partitionKeyArr);
5962
}
6063
if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
61-
for (String tableOption : tableOptionString.split(",")) {
62-
String[] kv = tableOption.split("=");
64+
// Use custom delimiter if provided, otherwise default to comma for backward
65+
// compatibility.
66+
String delimiter =
67+
StringUtils.isNullOrWhitespaceOnly(tableOptionsDelimiter)
68+
? ","
69+
: tableOptionsDelimiter;
70+
for (String tableOption : tableOptionString.split(delimiter)) {
71+
String[] kv = tableOption.split("=", 2);
6372
if (kv.length != 2) {
6473
throw new IllegalArgumentException(
65-
"table option format error: "
66-
+ tableOptionString
67-
+ ", it should be like `key1=value1,key2=value2`.");
74+
String.format(
75+
"table option format error: %s, it should be like `key1=value1%skey2=value2`.",
76+
tableOptionString, delimiter));
6877
}
6978
options.put(kv[0].trim(), kv[1].trim());
7079
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class TransformRule implements Serializable {
3434
private final String primaryKey;
3535
private final String partitionKey;
3636
private final String tableOption;
37+
private final String tableOptionsDelimiter;
3738
private final @Nullable String postTransformConverter;
3839
private final SupportedMetadataColumn[] supportedMetadataColumns;
3940

@@ -44,6 +45,7 @@ public TransformRule(
4445
String primaryKey,
4546
String partitionKey,
4647
String tableOption,
48+
String tableOptionsDelimiter,
4749
@Nullable String postTransformConverter,
4850
SupportedMetadataColumn[] supportedMetadataColumns) {
4951
this.tableInclusions = tableInclusions;
@@ -52,6 +54,7 @@ public TransformRule(
5254
this.primaryKey = primaryKey;
5355
this.partitionKey = partitionKey;
5456
this.tableOption = tableOption;
57+
this.tableOptionsDelimiter = tableOptionsDelimiter;
5558
this.postTransformConverter = postTransformConverter;
5659
this.supportedMetadataColumns = supportedMetadataColumns;
5760
}
@@ -82,6 +85,10 @@ public String getTableOption() {
8285
return tableOption;
8386
}
8487

88+
public String getTableOptionsDelimiter() {
89+
return tableOptionsDelimiter;
90+
}
91+
8592
@Nullable
8693
public String getPostTransformConverter() {
8794
return postTransformConverter;

0 commit comments

Comments
 (0)