Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions .github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ env:

jobs:
beam_PostCommit_Java_PVR_Flink_Batch:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
name: ${{ matrix.job_name }} (${{ matrix.flink_version }})
strategy:
matrix:
job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
# every major version
flink_version: ['1.20', '2.0']
timeout-minutes: 240
runs-on: [self-hosted, ubuntu-20.04, highmem]
if: |
Expand All @@ -83,13 +85,21 @@ jobs:
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run validatesPortableRunnerBatchDataSet script
uses: ./.github/actions/gradle-command-self-hosted-action
if: startsWith(matrix.flink_version, '1')
with:
gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatchDataSet
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
- name: run validatesPortableRunnerBatch script
uses: ./.github/actions/gradle-command-self-hosted-action
if: startsWith(matrix.flink_version, '2')
with:
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatch
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
- name: Archive JUnit Test Results
Expand All @@ -98,9 +108,4 @@ jobs:
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: java-code-coverage-report
path: "**/build/test-results/**/*.xml"
# TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2.
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ env:

jobs:
beam_PostCommit_Java_PVR_Flink_Streaming:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
name: ${{ matrix.job_name }} (${{ matrix.flink_version }})
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 120
strategy:
matrix:
job_name: [beam_PostCommit_Java_PVR_Flink_Streaming]
job_phrase: [Run Java Flink PortableValidatesRunner Streaming]
# every major version
flink_version: [ '1.20', '2.0' ]
if: |
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request_target' ||
Expand All @@ -71,13 +73,13 @@ jobs:
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run PostCommit Java Flink PortableValidatesRunner Streaming script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: runners:flink:1.20:job-server:validatesPortableRunnerStreaming
gradle-command: runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerStreaming
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Flink 2.0 support for Java classic Flink runner ([#36947](https://github.com/apache/beam/issues/36947)).
Also added intial, experimental support for Portable Flink runner since this Beam version.
* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)),
experimental support for other SDK languages including Python.


## I/Os
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,9 @@ private void translateImpulse(
public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
@Override
public boolean test(RunnerApi.PTransform pTransform) {
return STREAMING_IMPULSE_TRANSFORM_URN.equals(
PTransformTranslation.urnForTransformOrNull(pTransform));
String urn = PTransformTranslation.urnForTransformOrNull(pTransform);
return STREAMING_IMPULSE_TRANSFORM_URN.equals(urn)
|| PTransformTranslation.RESHUFFLE_URN.equals(urn);
}
}

Expand Down
1 change: 0 additions & 1 deletion sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ dependencies {
shadowTest library.java.guava_testlib
shadowTest library.java.mockito_core
shadowTest library.java.hamcrest
shadowTest "com.esotericsoftware.kryo:kryo:2.21"
shadowTest library.java.quickcheck_core
shadowTest library.java.quickcheck_generators
shadowTest library.java.zstd_jni
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:avro:vendored-test", configuration: "shadowTest")
testImplementation library.java.junit
testImplementation "org.tukaani:xz:1.9" // marked as optional in avro
testImplementation "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly library.java.slf4j_jdk14
avroVersions.each { k,v ->
Expand All @@ -86,6 +87,7 @@ dependencies {
"avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
"avroVersion$k" library.java.slf4j_jdk14
"avroVersion$k" "org.tukaani:xz:1.9" // marked as optional in avro
"avroVersion$k" "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test
"avroVersion$k" library.java.zstd_jni // marked as optional in avro
"avroVersion$k" "org.apache.avro:avro:$v:tests"
"avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void testKryoSerialization() throws Exception {

// Kryo instantiation
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false);
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.addDefaultSerializer(AvroCoder.SerializableSchemaSupplier.class, JavaSerializer.class);

Expand Down
2 changes: 1 addition & 1 deletion sdks/java/extensions/euphoria/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
testImplementation project(":sdks:java:extensions:kryo")
testImplementation project(path: ":sdks:java:extensions:kryo", configuration: "shadow")
testImplementation library.java.slf4j_api
testImplementation library.java.hamcrest
testImplementation library.java.mockito_core
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/extensions/kryo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
plugins { id 'org.apache.beam.module' }

ext {
kryoVersion = '5.5.0'
kryoVersion = '5.6.2'
}

applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.kryo',
Expand Down
Loading