Skip to content

Commit c1e883e

Browse files
committed
[flink] Implement FLIP-314 LineageVertexProvider for source and sink connectors
1 parent e49d964 commit c1e883e

15 files changed

Lines changed: 709 additions & 17 deletions

File tree

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.flink.lineage.DataStreamProviderFactory;
22+
import org.apache.paimon.table.Table;
23+
2124
import org.apache.flink.streaming.api.datastream.DataStream;
2225
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2326
import org.apache.flink.table.connector.ProviderContext;
2427
import org.apache.flink.table.connector.source.DataStreamScanProvider;
28+
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
2529
import org.apache.flink.table.data.RowData;
2630

2731
import java.util.function.Function;
@@ -38,6 +42,19 @@ public PaimonDataStreamScanProvider(
3842
this.producer = producer;
3943
}
4044

45+
/**
46+
* Creates a {@link ScanRuntimeProvider} that may be enriched with lineage metadata when running
47+
* on a Flink version that supports it.
48+
*/
49+
public static ScanRuntimeProvider createProvider(
50+
boolean isBounded,
51+
Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
52+
String name,
53+
Table table) {
54+
return DataStreamProviderFactory.getScanProvider(
55+
new PaimonDataStreamScanProvider(isBounded, producer), name, table);
56+
}
57+
4158
@Override
4259
public DataStream<RowData> produceDataStream(
4360
ProviderContext context, StreamExecutionEnvironment env) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.flink.lineage.DataStreamProviderFactory;
22+
import org.apache.paimon.table.Table;
23+
2124
import org.apache.flink.streaming.api.datastream.DataStream;
2225
import org.apache.flink.streaming.api.datastream.DataStreamSink;
2326
import org.apache.flink.table.connector.ProviderContext;
2427
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
28+
import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
2529
import org.apache.flink.table.data.RowData;
2630

2731
import java.util.function.Function;
@@ -35,6 +39,16 @@ public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink
3539
this.producer = producer;
3640
}
3741

42+
/**
43+
* Creates a {@link SinkRuntimeProvider} that may be enriched with lineage metadata when running
44+
* on a Flink version that supports it.
45+
*/
46+
public static SinkRuntimeProvider createProvider(
47+
Function<DataStream<RowData>, DataStreamSink<?>> producer, String name, Table table) {
48+
return DataStreamProviderFactory.getSinkProvider(
49+
new PaimonDataStreamSinkProvider(producer), name, table);
50+
}
51+
3852
@Override
3953
public DataStreamSink<?> consumeDataStream(
4054
ProviderContext providerContext, DataStream<RowData> dataStream) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
5757

5858
@Override
5959
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
60-
return new PaimonDataStreamSinkProvider(
60+
return PaimonDataStreamSinkProvider.createProvider(
6161
(dataStream) ->
6262
new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions)
63-
.sinkFrom(dataStream));
63+
.sinkFrom(dataStream),
64+
tableIdentifier.asSummaryString(),
65+
table);
6466
}
6567

6668
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
104104
throw new UnsupportedOperationException(
105105
"Paimon doesn't support streaming INSERT OVERWRITE.");
106106
}
107+
String name = tableIdentifier.asSummaryString();
108+
107109
if (table instanceof FormatTable) {
108110
FormatTable formatTable = (FormatTable) table;
109-
return new PaimonDataStreamSinkProvider(
111+
return PaimonDataStreamSinkProvider.createProvider(
110112
(dataStream) ->
111113
new FlinkFormatTableDataStreamSink(
112114
formatTable, overwrite, staticPartitions)
113-
.sinkFrom(dataStream));
115+
.sinkFrom(dataStream),
116+
name,
117+
table);
114118
}
115119

116120
Options conf = Options.fromMap(table.options());
117121
// Do not sink to log store when overwrite mode
118-
return new PaimonDataStreamSinkProvider(
122+
return PaimonDataStreamSinkProvider.createProvider(
119123
(dataStream) -> {
120124
FlinkSinkBuilder builder = createSinkBuilder();
121125
builder.forRowData(
@@ -134,7 +138,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
134138
}
135139
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
136140
return builder.build();
137-
});
141+
},
142+
name,
143+
table);
138144
}
139145

140146
protected FlinkSinkBuilder createSinkBuilder() {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
195195
.limit(limit)
196196
.watermarkStrategy(watermarkStrategy)
197197
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
198-
199-
return new PaimonDataStreamScanProvider(
198+
return PaimonDataStreamScanProvider.createProvider(
200199
!unbounded,
201200
env ->
202201
sourceBuilder
203202
.sourceParallelism(inferSourceParallelism(env))
204203
.env(env)
205-
.build());
204+
.build(),
205+
tableIdentifier.asSummaryString(),
206+
table);
206207
}
207208

208209
private ScanRuntimeProvider createCountStarScan() {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
117117
CoreOptions.BLOB_AS_DESCRIPTOR.key(),
118118
"false")));
119119
}
120-
return new PaimonDataStreamScanProvider(
120+
return PaimonDataStreamScanProvider.createProvider(
121121
source.getBoundedness() == Boundedness.BOUNDED,
122122
env -> {
123123
Integer parallelism = inferSourceParallelism(env);
@@ -130,7 +130,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
130130
dataStreamSource.setParallelism(parallelism);
131131
}
132132
return dataStreamSource;
133-
});
133+
},
134+
tableIdentifier.asSummaryString(),
135+
table);
134136
}
135137

136138
@Override

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.paimon.data.GenericRow;
2222
import org.apache.paimon.flink.FlinkConnectorOptions;
23-
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
2423
import org.apache.paimon.fs.FileIO;
2524
import org.apache.paimon.fs.Path;
2625
import org.apache.paimon.fs.local.LocalFileIO;
@@ -43,6 +42,7 @@
4342
import org.apache.flink.streaming.api.datastream.DataStream;
4443
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4544
import org.apache.flink.table.catalog.ObjectIdentifier;
45+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
4646
import org.apache.flink.table.connector.source.DynamicTableSource;
4747
import org.apache.flink.table.connector.source.ScanTableSource;
4848
import org.apache.flink.table.data.RowData;
@@ -75,7 +75,7 @@ void testInferScanParallelism() throws Exception {
7575
DataTableSource tableSource =
7676
new DataTableSource(
7777
ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null);
78-
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
78+
DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
7979
StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment();
8080
sEnv1.setParallelism(-1);
8181
DataStream<RowData> sourceStream1 =
@@ -105,7 +105,7 @@ public void testInferStreamParallelism() throws Exception {
105105
DataTableSource tableSource =
106106
new DataTableSource(
107107
ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null);
108-
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
108+
DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
109109

110110
StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment();
111111
DataStream<RowData> sourceStream1 =
@@ -123,7 +123,7 @@ public void testSystemTableParallelism() throws Exception {
123123

124124
SystemTableSource tableSource =
125125
new SystemTableSource(ro, false, ObjectIdentifier.of("cat", "db", "table"));
126-
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
126+
DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource);
127127

128128
Configuration configuration = new Configuration();
129129
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
@@ -133,8 +133,8 @@ public void testSystemTableParallelism() throws Exception {
133133
assertThat(sourceStream1.getParallelism()).isEqualTo(3);
134134
}
135135

136-
private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) {
137-
return (PaimonDataStreamScanProvider)
136+
private DataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) {
137+
return (DataStreamScanProvider)
138138
tableSource.getScanRuntimeProvider(
139139
new ScanTableSource.ScanContext() {
140140
@Override
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.lineage;
20+
21+
import org.apache.paimon.table.Table;
22+
23+
import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
24+
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
25+
26+
/** Stub factory for Flink 1.x. Returns providers unchanged since lineage is not supported. */
27+
public class DataStreamProviderFactory {
28+
29+
/** Returns the provider unchanged. Flink 1.x does not support lineage. */
30+
public static ScanRuntimeProvider getScanProvider(
31+
ScanRuntimeProvider provider, String name, Table table) {
32+
return provider;
33+
}
34+
35+
/** Returns the provider unchanged. Flink 1.x does not support lineage. */
36+
public static SinkRuntimeProvider getSinkProvider(
37+
SinkRuntimeProvider provider, String name, Table table) {
38+
return provider;
39+
}
40+
}

paimon-flink/paimon-flink2-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ under the License.
5555
<version>${flink.version}</version>
5656
<scope>provided</scope>
5757
</dependency>
58+
<dependency>
59+
<groupId>org.apache.flink</groupId>
60+
<artifactId>flink-table-api-java-bridge</artifactId>
61+
<version>${flink.version}</version>
62+
<scope>provided</scope>
63+
</dependency>
5864
<dependency>
5965
<groupId>org.apache.flink</groupId>
6066
<artifactId>flink-runtime</artifactId>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.lineage;
20+
21+
import org.apache.paimon.table.Table;
22+
23+
import org.apache.flink.streaming.api.datastream.DataStream;
24+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.streaming.api.lineage.LineageVertex;
27+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
28+
import org.apache.flink.table.connector.ProviderContext;
29+
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
30+
import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
31+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
32+
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
33+
import org.apache.flink.table.data.RowData;
34+
35+
/**
36+
* Factory that wraps {@link DataStreamScanProvider} and {@link DataStreamSinkProvider} with {@link
37+
* LineageVertexProvider} support for Flink 2.0+.
38+
*/
39+
public class DataStreamProviderFactory {
40+
41+
/**
42+
* Returns a {@link ScanRuntimeProvider} that also implements {@link LineageVertexProvider} so
43+
* Flink's lineage graph discovers the Paimon source table.
44+
*/
45+
public static ScanRuntimeProvider getScanProvider(
46+
ScanRuntimeProvider provider, String name, Table table) {
47+
return new LineageAwarePaimonDataStreamScanProvider(
48+
(DataStreamScanProvider) provider, name, table);
49+
}
50+
51+
/**
52+
* Returns a {@link SinkRuntimeProvider} that also implements {@link LineageVertexProvider} so
53+
* Flink's lineage graph discovers the Paimon sink table.
54+
*/
55+
public static SinkRuntimeProvider getSinkProvider(
56+
SinkRuntimeProvider provider, String name, Table table) {
57+
return new LineageAwarePaimonDataStreamSinkProvider(
58+
(DataStreamSinkProvider) provider, name, table);
59+
}
60+
61+
private static class LineageAwarePaimonDataStreamScanProvider
62+
implements DataStreamScanProvider, LineageVertexProvider {
63+
64+
private final DataStreamScanProvider delegate;
65+
private final String name;
66+
private final Table table;
67+
68+
LineageAwarePaimonDataStreamScanProvider(
69+
DataStreamScanProvider delegate, String name, Table table) {
70+
this.delegate = delegate;
71+
this.name = name;
72+
this.table = table;
73+
}
74+
75+
@Override
76+
public DataStream<RowData> produceDataStream(
77+
ProviderContext context, StreamExecutionEnvironment env) {
78+
return delegate.produceDataStream(context, env);
79+
}
80+
81+
@Override
82+
public boolean isBounded() {
83+
return delegate.isBounded();
84+
}
85+
86+
@Override
87+
public LineageVertex getLineageVertex() {
88+
return LineageUtils.sourceLineageVertex(name, delegate.isBounded(), table);
89+
}
90+
}
91+
92+
private static class LineageAwarePaimonDataStreamSinkProvider
93+
implements DataStreamSinkProvider, LineageVertexProvider {
94+
95+
private final DataStreamSinkProvider delegate;
96+
private final String name;
97+
private final Table table;
98+
99+
LineageAwarePaimonDataStreamSinkProvider(
100+
DataStreamSinkProvider delegate, String name, Table table) {
101+
this.delegate = delegate;
102+
this.name = name;
103+
this.table = table;
104+
}
105+
106+
@Override
107+
public DataStreamSink<?> consumeDataStream(
108+
ProviderContext providerContext, DataStream<RowData> dataStream) {
109+
return delegate.consumeDataStream(providerContext, dataStream);
110+
}
111+
112+
@Override
113+
public LineageVertex getLineageVertex() {
114+
return LineageUtils.sinkLineageVertex(name, table);
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)