Skip to content

Commit 745fc5c

Browse files
authored
Merge pull request #661 from mspruc/flink-datastream
Source, map, collection, sink for bounded data streams in Flink
2 parents 6172a5f + 65147cf commit 745fc5c

21 files changed

+1281
-145
lines changed

wayang-platforms/wayang-flink/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,16 @@
116116
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
117117
<version>${flink.version}</version>
118118
</dependency>
119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-connector-files</artifactId>
122+
<version>${flink.version}</version>
123+
</dependency>
124+
<dependency>
125+
<groupId>org.apache.flink</groupId>
126+
<artifactId>flink-streaming-java</artifactId>
127+
<version>${flink.version}</version>
128+
</dependency>
119129
<dependency>
120130
<groupId>org.apache.commons</groupId>
121131
<artifactId>commons-math3</artifactId>
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.flink.channels;
20+
21+
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.wayang.core.optimizer.OptimizationContext;
23+
import org.apache.wayang.core.plan.executionplan.Channel;
24+
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
25+
import org.apache.wayang.core.platform.AbstractChannelInstance;
26+
import org.apache.wayang.core.platform.ChannelDescriptor;
27+
import org.apache.wayang.core.platform.ChannelInstance;
28+
import org.apache.wayang.core.platform.Executor;
29+
import org.apache.wayang.flink.execution.FlinkExecutor;
30+
31+
import java.util.OptionalLong;
32+
33+
public class DataStreamChannel extends Channel {
34+
35+
/**
36+
* {@link ChannelInstance} implementation for {@link DataStream}s.
37+
*/
38+
public class Instance extends AbstractChannelInstance {
39+
40+
private DataStream<?> dataStream;
41+
42+
// TODO: this.size is currently always 0
43+
private long size;
44+
45+
public Instance(final FlinkExecutor executor,
46+
final OptimizationContext.OperatorContext producerOperatorContext,
47+
final int producerOutputIndex) {
48+
super(executor, producerOperatorContext, producerOutputIndex);
49+
}
50+
51+
public void accept(final DataStream<?> dataStream) {
52+
this.dataStream = dataStream;
53+
}
54+
55+
@SuppressWarnings("unchecked")
56+
public <T> DataStream<T> provideDataStream() {
57+
return (DataStream<T>) this.dataStream;
58+
}
59+
60+
@Override
61+
public OptionalLong getMeasuredCardinality() {
62+
return this.size == 0 ? super.getMeasuredCardinality() : OptionalLong.of(this.size);
63+
}
64+
65+
@Override
66+
public DataStreamChannel getChannel() {
67+
return DataStreamChannel.this;
68+
}
69+
70+
@Override
71+
protected void doDispose() {
72+
this.dataStream = null;
73+
}
74+
}
75+
76+
public static final ChannelDescriptor DESCRIPTOR = new ChannelDescriptor(
77+
DataStreamChannel.class, true, false);
78+
79+
public static final ChannelDescriptor DESCRIPTOR_MANY = new ChannelDescriptor(
80+
DataStreamChannel.class, true, false);
81+
82+
public DataStreamChannel(final ChannelDescriptor descriptor, final OutputSlot<?> outputSlot) {
83+
super(descriptor, outputSlot);
84+
assert descriptor == DESCRIPTOR || descriptor == DESCRIPTOR_MANY;
85+
this.markForInstrumentation();
86+
}
87+
88+
private DataStreamChannel(final DataStreamChannel parent) {
89+
super(parent);
90+
}
91+
92+
@Override
93+
public Channel copy() {
94+
return new DataStreamChannel(this);
95+
}
96+
97+
@Override
98+
public Instance createInstance(final Executor executor,
99+
final OptimizationContext.OperatorContext producerOperatorContext,
100+
final int producerOutputIndex) {
101+
return new Instance((FlinkExecutor) executor, producerOperatorContext, producerOutputIndex);
102+
}
103+
}

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class FunctionCompiler {
5858
* @param <O> output type of the transformation
5959
* @return a compiled function
6060
*/
61-
public <I, O> MapFunction<I, O> compile(TransformationDescriptor<I, O> descriptor) {
61+
public static <I, O> MapFunction<I, O> compile(TransformationDescriptor<I, O> descriptor) {
6262
// This is a dummy method but shows the intention of having something compilable in the descriptors.
6363
Function<I, O> function = descriptor.getJavaImplementation();
6464
return (MapFunction<I, O>) i -> function.apply(i);
@@ -72,7 +72,7 @@ public <I, O> MapFunction<I, O> compile(TransformationDescriptor<I, O> descripto
7272
* @param <O> output type of the transformation
7373
* @return a compiled function
7474
*/
75-
public <I, O> FlatMapFunction<I, O> compile(FunctionDescriptor.SerializableFunction<I, Iterable<O>> flatMapDescriptor) {
75+
public static <I, O> FlatMapFunction<I, O> compile(FunctionDescriptor.SerializableFunction<I, Iterable<O>> flatMapDescriptor) {
7676
return (t, collector) -> flatMapDescriptor.apply(t).forEach(collector::collect);
7777
}
7878

@@ -83,7 +83,7 @@ public <I, O> FlatMapFunction<I, O> compile(FunctionDescriptor.SerializableFunct
8383
* @param <T> input/output type of the transformation
8484
* @return a compiled function
8585
*/
86-
public <T> ReduceFunction<T> compile(ReduceDescriptor<T> descriptor) {
86+
public static <T> ReduceFunction<T> compile(ReduceDescriptor<T> descriptor) {
8787
// This is a dummy method but shows the intention of having something compilable in the descriptors.
8888
BiFunction<T, T, T> reduce_function = descriptor.getJavaImplementation();
8989
return new ReduceFunction<T>() {
@@ -94,26 +94,26 @@ public T reduce(T t, T t1) throws Exception {
9494
};
9595
}
9696

97-
public <T> FilterFunction<T> compile(PredicateDescriptor.SerializablePredicate<T> predicateDescriptor) {
98-
return t -> predicateDescriptor.test(t);
97+
public static <T> FilterFunction<T> compile(PredicateDescriptor.SerializablePredicate<T> predicateDescriptor) {
98+
return predicateDescriptor::test;
9999
}
100100

101101

102-
public <T> OutputFormat<T> compile(ConsumerDescriptor.SerializableConsumer<T> consumerDescriptor) {
102+
public static <T> OutputFormat<T> compile(ConsumerDescriptor.SerializableConsumer<T> consumerDescriptor) {
103103
return new OutputFormatConsumer<T>(consumerDescriptor);
104104
}
105105

106106

107-
public <T, K> KeySelector<T, K> compileKeySelector(TransformationDescriptor<T, K> descriptor){
107+
public static <T, K> KeySelector<T, K> compileKeySelector(TransformationDescriptor<T, K> descriptor){
108108
return new KeySelectorFunction<T, K>(descriptor);
109109
}
110110

111-
public <T0, T1, O> CoGroupFunction<T0, T1, O> compileCoGroup(){
111+
public static <T0, T1, O> CoGroupFunction<T0, T1, O> compileCoGroup(){
112112
return new FlinkCoGroupFunction<T0, T1, O>();
113113
}
114114

115115

116-
public <T> TextOutputFormat.TextFormatter<T> compileOutput(TransformationDescriptor<T, String> formattingDescriptor) {
116+
public static <T> TextOutputFormat.TextFormatter<T> compileOutput(TransformationDescriptor<T, String> formattingDescriptor) {
117117
Function<T, String> format = formattingDescriptor.getJavaImplementation();
118118
return new TextOutputFormat.TextFormatter<T>(){
119119

@@ -132,7 +132,7 @@ public String format(T value) {
132132
* @param <O> output type of the transformation
133133
* @return a compiled function
134134
*/
135-
public <I, O> MapPartitionFunction<I, O> compile(MapPartitionsDescriptor<I, O> descriptor){
135+
public static <I, O> MapPartitionFunction<I, O> compile(MapPartitionsDescriptor<I, O> descriptor){
136136
Function<Iterable<I>, Iterable<O>> function = descriptor.getJavaImplementation();
137137
return new MapPartitionFunction<I, O>() {
138138
@Override
@@ -146,13 +146,12 @@ public void mapPartition(Iterable<I> iterable, Collector<O> collector) throws Ex
146146
};
147147
}
148148

149-
public <T> WayangConvergenceCriterion compile(PredicateDescriptor<Collection<T>> descriptor){
150-
FunctionDescriptor.SerializablePredicate<Collection<T>> predicate = descriptor.getJavaImplementation();
151-
return new WayangConvergenceCriterion(predicate);
149+
public static <T> WayangConvergenceCriterion<T> compile(PredicateDescriptor<Collection<T>> descriptor){
150+
return new WayangConvergenceCriterion<T>(descriptor.getJavaImplementation());
152151
}
153152

154153

155-
public <I, O> RichFlatMapFunction<I, O> compile(FunctionDescriptor.ExtendedSerializableFunction<I, Iterable<O>> flatMapDescriptor, FlinkExecutionContext exe) {
154+
public static <I, O> RichFlatMapFunction<I, O> compile(FunctionDescriptor.ExtendedSerializableFunction<I, Iterable<O>> flatMapDescriptor, FlinkExecutionContext exe) {
156155

157156
return new RichFlatMapFunction<I, O>() {
158157
@Override
@@ -168,7 +167,7 @@ public void flatMap(I value, Collector<O> out) throws Exception {
168167
}
169168

170169

171-
public <I, O> RichMapFunction<I, O> compile(TransformationDescriptor<I, O> mapDescriptor, FlinkExecutionContext fex ) {
170+
public static <I, O> RichMapFunction<I, O> compile(TransformationDescriptor<I, O> mapDescriptor, FlinkExecutionContext fex ) {
172171

173172
FunctionDescriptor.ExtendedSerializableFunction<I, O> map = (FunctionDescriptor.ExtendedSerializableFunction) mapDescriptor.getJavaImplementation();
174173
return new RichMapFunction<I, O>() {
@@ -186,7 +185,7 @@ public void open(Configuration parameters) throws Exception {
186185

187186

188187

189-
public <I, O> RichMapPartitionFunction<I, O> compile(MapPartitionsDescriptor<I, O> descriptor, FlinkExecutionContext fex){
188+
public static <I, O> RichMapPartitionFunction<I, O> compile(MapPartitionsDescriptor<I, O> descriptor, FlinkExecutionContext fex){
190189
FunctionDescriptor.ExtendedSerializableFunction<Iterable<I>, Iterable<O>> function =
191190
(FunctionDescriptor.ExtendedSerializableFunction<Iterable<I>, Iterable<O>>)
192191
descriptor.getJavaImplementation();

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.wayang.flink.execution;
2020

2121
import org.apache.flink.api.java.ExecutionEnvironment;
22+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2223
import org.apache.wayang.core.api.Job;
2324
import org.apache.wayang.core.api.exception.WayangException;
2425
import org.apache.wayang.core.optimizer.OptimizationContext;
@@ -35,7 +36,6 @@
3536
import org.apache.wayang.flink.compiler.FunctionCompiler;
3637
import org.apache.wayang.flink.operators.FlinkExecutionOperator;
3738
import org.apache.wayang.flink.platform.FlinkPlatform;
38-
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
3939

4040
import java.util.Arrays;
4141
import java.util.Collection;
@@ -56,6 +56,12 @@ public class FlinkExecutor extends PushExecutorTemplate {
5656
*/
5757
public ExecutionEnvironment fee;
5858

59+
60+
/**
61+
* {@link StreamExecutionEnvironment} for bounded and continuous streams.
62+
*/
63+
public StreamExecutionEnvironment sEnv;
64+
5965
/**
6066
* Compiler to create flink UDFs.
6167
*/
@@ -76,6 +82,7 @@ public FlinkExecutor(FlinkPlatform flinkPlatform, Job job) {
7682
super(job);
7783
this.platform = flinkPlatform;
7884
this.flinkContextReference = this.platform.getFlinkContext(job);
85+
this.sEnv = flinkPlatform.streamExecutionEnvironment;
7986
this.fee = this.flinkContextReference.get();
8087
this.numDefaultPartitions = (int) this.getConfiguration().getLongProperty("wayang.flink.parallelism");
8188
this.fee.setParallelism(this.numDefaultPartitions);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.flink.mapping;
20+
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
24+
import org.apache.wayang.basic.operators.TextFileSource;
25+
import org.apache.wayang.core.mapping.Mapping;
26+
import org.apache.wayang.core.mapping.OperatorPattern;
27+
import org.apache.wayang.core.mapping.PlanTransformation;
28+
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
29+
import org.apache.wayang.core.mapping.SubplanPattern;
30+
import org.apache.wayang.flink.operators.FlinkBoundedTextFileSource;
31+
import org.apache.wayang.flink.platform.FlinkPlatform;
32+
33+
public class BoundedTextFileSourceMapping implements Mapping {
34+
@Override
35+
public Collection<PlanTransformation> getTransformations() {
36+
return Collections.singleton(new PlanTransformation(
37+
this.createSubplanPattern(),
38+
this.createReplacementSubplanFactory(),
39+
FlinkPlatform.getInstance()
40+
));
41+
}
42+
43+
private SubplanPattern createSubplanPattern() {
44+
final OperatorPattern<?> operatorPattern = new OperatorPattern<>(
45+
"source", new TextFileSource("", null), false
46+
);
47+
return SubplanPattern.createSingleton(operatorPattern);
48+
}
49+
50+
private ReplacementSubplanFactory createReplacementSubplanFactory() {
51+
return new ReplacementSubplanFactory.OfSingleOperators<TextFileSource>(
52+
(matchedOperator, epoch) -> new FlinkBoundedTextFileSource(matchedOperator).at(epoch)
53+
);
54+
}
55+
}

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@
1919
package org.apache.wayang.flink.mapping;
2020

2121
import org.apache.wayang.core.mapping.Mapping;
22+
import org.apache.wayang.flink.plugin.FlinkBasicPlugin;
2223

2324
import java.util.Arrays;
2425
import java.util.Collection;
2526

2627
/**
27-
* Register for {@link Mapping}s for this platform.
28+
* Register for {@link Mapping}s for {@link FlinkBasicPlugin}.
2829
*/
2930
public class Mappings {
3031

31-
public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
32+
/**
33+
* Mappings using Flink's DataSets
34+
* @deprecated DataSet API in Flink has been deprecated move over to bounded streams for a 1-to-1 replacement {@link #BOUNDED_STREAM_MAPPINGS}.
35+
*/
36+
public static final Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
3237
new CartesianMapping(),
3338
new CoGroupMapping(),
3439
new CollectionSourceMapping(),
@@ -60,6 +65,12 @@ public class Mappings {
6065
new ZipWithIdMapping()
6166
);
6267

68+
public static final Collection<Mapping> BOUNDED_STREAM_MAPPINGS = Arrays.asList(
69+
new BoundedTextFileSourceMapping(),
70+
new StreamedMapMapping(),
71+
new StreamedJoinMapping(),
72+
new StreamedLocalCallbackSinkMapping()
73+
);
6374
}
6475

6576

0 commit comments

Comments
 (0)