Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,44 @@ pipeline:

注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过 `flink-cdc.sh --jar` 选项传递。

### UDF 配置选项

你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过 `UserDefinedFunctionContext.configuration()` 获取:

```yaml
pipeline:
user-defined-function:
- name: query_redis
classpath: com.example.flink.cdc.udf.RedisQueryFunction
options:
hostname: localhost
port: "6379"
cache.enabled: "true"
```

在你的 UDF 实现中,可以这样访问这些配置选项:

```java
public class RedisQueryFunction implements UserDefinedFunction {
private String hostname;
private int port;

@Override
public void open(UserDefinedFunctionContext context) throws Exception {
hostname = context.configuration().get("hostname");
port = Integer.parseInt(context.configuration().get("port"));
// 在这里初始化你的连接...

}

public Object eval(String key) {
// 使用 hostname 和 port 查询 Redis...
}
}
```

`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。

在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样:

```yaml
Expand Down
37 changes: 37 additions & 0 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,43 @@ pipeline:

Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.

### UDF Options

You can pass extra options to UDFs by adding an `options` block. These options will be available in the `open` method through `UserDefinedFunctionContext.configuration()`:

```yaml
pipeline:
user-defined-function:
- name: query_redis
classpath: com.example.flink.cdc.udf.RedisQueryFunction
options:
hostname: localhost
port: "6379"
cache.enabled: "true"
```

And in your UDF implementation, you can access these options:

```java
public class RedisQueryFunction implements UserDefinedFunction {
private String hostname;
private int port;

@Override
public void open(UserDefinedFunctionContext context) throws Exception {
hostname = context.configuration().get("hostname");
port = Integer.parseInt(context.configuration().get("port"));
// Initialize your connection here...
}

public Object eval(String key) {
// Query Redis using hostname and port...
}
}
```

The `options` field is optional. If not specified, an empty configuration will be passed to the UDF.

After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:

```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String UDF_KEY = "user-defined-function";
private static final String UDF_FUNCTION_NAME_KEY = "name";
private static final String UDF_CLASSPATH_KEY = "classpath";
private static final String UDF_OPTIONS_KEY = "options";

// Model related keys
private static final String MODEL_NAME_KEY = "model-name";
Expand Down Expand Up @@ -295,7 +296,7 @@ private UdfDef toUdfDef(JsonNode udfNode) {
"UDF",
udfNode,
Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
Collections.emptyList());
Collections.singletonList(UDF_OPTIONS_KEY));

String functionName =
checkNotNull(
Expand All @@ -310,7 +311,15 @@ private UdfDef toUdfDef(JsonNode udfNode) {
UDF_CLASSPATH_KEY)
.asText();

return new UdfDef(functionName, classpath);
Map<String, String> options =
Optional.ofNullable(udfNode.get(UDF_OPTIONS_KEY))
.map(
node ->
mapper.convertValue(
node, new TypeReference<Map<String, String>>() {}))
.orElse(null);

return new UdfDef(functionName, classpath, options);
}

private TransformDef toTransformDef(JsonNode transformNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ void testUdfDefinition() throws Exception {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
}

@Test
void testUdfDefinitionWithOptions() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-with-udf-options.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
}

@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
Expand Down Expand Up @@ -669,4 +678,42 @@ void testParsingFullDefinitionFromString() throws Exception {
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.build()));

private final PipelineDef pipelineDefWithUdfOptions =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
new SinkDef(
"values",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"mydb.web_order",
"*, query_redis(id) as redis_value",
"id > 0",
null,
null,
null,
null,
null)),
Collections.singletonList(
new UdfDef(
"query_redis",
"org.apache.flink.cdc.udf.examples.java.RedisQueryFunction",
ImmutableMap.<String, String>builder()
.put("hostname", "localhost")
.put("port", "6379")
.put("cache.enabled", "true")
.build())),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.build()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: values

sink:
type: values

transform:
- source-table: mydb.web_order
projection: "*, query_redis(id) as redis_value"
filter: id > 0

pipeline:
parallelism: 1
user-defined-function:
- name: query_redis
classpath: org.apache.flink.cdc.udf.examples.java.RedisQueryFunction
options:
hostname: localhost
port: "6379"
cache.enabled: "true"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.composer.definition;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -27,15 +29,22 @@
* <ul>
* <li>name: Static method name of user-defined functions.
* <li>classpath: Fully-qualified class path of package containing given function.
* <li>options: Configuration options for the user-defined function.
* </ul>
*/
public class UdfDef {
private final String name;
private final String classpath;
private final Map<String, String> options;

public UdfDef(String name, String classpath) {
this(name, classpath, new HashMap<>());
}

public UdfDef(String name, String classpath, Map<String, String> options) {
this.name = name;
this.classpath = classpath;
this.options = options != null ? options : new HashMap<>();
}

public String getName() {
Expand All @@ -46,6 +55,10 @@ public String getClasspath() {
return classpath;
}

public Map<String, String> getOptions() {
return options;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -56,16 +69,27 @@ public boolean equals(Object o) {
}

UdfDef udfDef = (UdfDef) o;
return Objects.equals(name, udfDef.name) && Objects.equals(classpath, udfDef.classpath);
return Objects.equals(name, udfDef.name)
&& Objects.equals(classpath, udfDef.classpath)
&& Objects.equals(options, udfDef.options);
}

@Override
public int hashCode() {
return Objects.hash(name, classpath);
return Objects.hash(name, classpath, options);
}

@Override
public String toString() {
return "UdfDef{" + "name='" + name + '\'' + ", classpath='" + classpath + '\'' + '}';
return "UdfDef{"
+ "name='"
+ name
+ '\''
+ ", classpath='"
+ classpath
+ '\''
+ ", options="
+ options
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,6 +132,6 @@ private Tuple3<String, String, Map<String, String>> modelToUDFTuple(ModelDef mod
}

private Tuple3<String, String, Map<String, String>> udfDefToUDFTuple(UdfDef udf) {
return Tuple3.of(udf.getName(), udf.getClasspath(), new HashMap<>());
return Tuple3.of(udf.getName(), udf.getClasspath(), udf.getOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ steps:
language: clojure
error: |
Unexpected key `language` in YAML UDF block.
Allowed keys in this context are: [name, classpath]
Allowed keys in this context are: [name, classpath, options]
Note: option language: "clojure" is unexpected. It was silently ignored in previous versions, and probably should be removed.
Loading