Skip to content

[Bug]: KafkaIO cross-language not working with Go SDK v2.68 on Flink runner #36539

@kishanshukla-2307

Description

@kishanshukla-2307

What happened?

I’m trying to run a Go → Kafka → Go (KafkaIO) proof of concept using Apache Beam Go SDK v2.68 with the Flink portable runner.
The job starts successfully and launches two SDK harnesses (Go and Java), but the KafkaIO read transform never emits elements — the job stays idle and periodically logs “operation ongoing in bundle … without outputting or completing.”
Messages published to the input topic are not seen in the Go ParDo or written to the output topic.

Environment

Component - Version / Setup

Beam Go SDK - 2.68.0
Flink Runner - 1.19 (MiniCluster via beam-runners-flink-1.19-job-server-2.68.0.jar)
Expansion Service - beam-sdks-java-io-expansion-service-2.68.0.jar
Kafka - v4.0.0 / Local (localhost:9092)
Harness environment - Docker (apache/beam_go_sdk:2.68.0, apache/beam_java17_sdk:2.68.0)
OS - macOS (Docker Desktop)

Runner options used: --runner=portable --endpoint=localhost:8099 --environment_type=DOCKER
-- environment_config=apache/beam_go_sdk:2.68.0 --expansion_addr=localhost:8088

What works
• Go-only pipelines (beam.Create → ParDo(log)) complete successfully.
• Kafka producer and consumer work fine outside Beam.
• Two SDK harnesses start as expected (Go + Java).
• Java harness logs Kafka producer initialization.

What fails
• KafkaIO.Read emits no elements to the Go ParDo.
• No messages appear on the output topic (when KafkaIO.Write is enabled).
• Go DoFn logs (log.Infof) never appear.
• The job remains active with the “bundle ongoing” warnings shown above.

My Pipeline code:

package main

import (
    "context"
    "flag"
    "fmt"
    "time"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

const (
    kafkaBroker = "localhost:9092"
    inputTopic  = "input"
    outputTopic = "output"
)

var expansionAddr = flag.String("expansion_addr", "", "Address of Kafka expansion service")

func logRecord(ctx context.Context, k, v []byte) ([]byte, []byte) {
    log.Infof(ctx, "got record key=%q val=%q", string(k), string(v))
    return k, v
}

func main() {
    flag.Parse()
    beam.Init()
    ctx := context.Background()
    p, s := beam.NewPipelineWithRoot()

    input := kafkaio.Read(
        s,
        *expansionAddr,
        kafkaBroker,
        []string{inputTopic}
    )

    beam.ParDo(s, logRecord, input)
    kafkaio.Write(s, *expansionAddr, kafkaBroker, outputTopic, input)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Pipeline failed: %v", err)
    }
}

Reproduction Steps
1. Start Kafka locally and create input and output topics.
2. Run the Kafka expansion service: java -jar beam-sdks-java-io-expansion-service-2.68.0.jar 8088 --javaClassLookupAllowlistFile='*'
3. Start the Flink job server:java -jar beam-runners-flink-1.19-job-server-2.68.0.jar --job-port=8099 --artifact-port=8098
4. Run the Go pipeline: go run . --runner=portable --endpoint=localhost:8099 --expansion_addr=localhost:8088 --environment_type=DOCKER --environment_config=apache/beam_go_sdk:2.68.0
5. Produce a few messages to kafka input topic
6. Observe no messages logged and no output produced.

Logs

Job server (a part of log):

Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: initializing Kafka metrics collector
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka version: 3.9.0
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka commitId: 84caaa6e9da06435
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka startTimeMs: 1760626427960
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_904017666_none-3, groupId=Reader-0_offset_consumer_904017666_none] Cluster ID: 5L6g3nShT-eMCtK--X86sw
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService getProcessBundleDescriptor
INFO: getProcessBundleDescriptor request with id 1-3
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
WARNING: Failed to parse element_processing_timeout: time: invalid duration "", there will be no timeout for processing an element in a PTransform operation
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.data.GrpcDataService data
INFO: Beam Fn Data client connected.
...
WARNING: Operation ongoing in bundle 1 for at least 05m00s without outputting or completing

Go harness:

2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1" value:{string_value:"go-job-1-1760626399474061000"}} fields:{key:"beam:option:experiments:v1" value:{list_value:{values:{string_value:"beam_fn_api"}}}} fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}} fields:{key:"beam:option:go_options:v1" value:{struct_value:{fields:{key:"options" value:{struct_value:{fields:{key:"endpoint" value:{string_value:"localhost:8099"}} fields:{key:"environment_config" value:{string_value:"apache/beam_go_sdk:2.68.0"}} fields:{key:"environment_type" value:{string_value:"DOCKER"}} fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}} fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}} fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}} fields:{key:"runner" value:{string_value:"portable"}}}}}}}} fields:{key:"beam:option:job_name:v1" value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}} fields:{key:"beam:option:options_id:v1" value:{number_value:2}} fields:{key:"beam:option:output_executable_path:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1" value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1" value:{bool_value:false}} fields:{key:"beam:option:runner:v1" value:{null_value:NULL_VALUE}}} retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b" logging_endpoint:{url:"host.docker.internal:61976"} artifact_endpoint:{url:"host.docker.internal:61978"} control_endpoint:{url:"host.docker.internal:61974"} dependencies:{type_urn:"beam:artifact:type:file:v1" type_payload:"\n\xdc\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/1-0:go-/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/worker-1-1760626399474073000" role_urn:"beam:artifact:role:go_worker_binary:v1"} runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:40 2025/10/16 14:53:40 Downloaded: /tmp/staged/1-worker-1-1760626399474073000 (sha256: 4e8c52729f472480c02833507b225ea334663bea8426cd623e2afc6993ed630f, size: 67584651)
2025-10-16 20:23:43 Error Setting Rlimit  operation not permitted

Java Harness:

2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1760626399474061000"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:flink_conf_dir:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:flink_master:v1"  value:{string_value:"[auto]"}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"endpoint"  value:{string_value:"localhost:8099"}}  fields:{key:"environment_config"  value:{string_value:"apache/beam_go_sdk:2.68.0"}}  fields:{key:"environment_type"  value:{string_value:"DOCKER"}}  fields:{key:"expansion_addr"  value:{string_value:"localhost:8088"}}  fields:{key:"hookOrder"  value:{string_value:"[\"default_remote_logging\"]"}}  fields:{key:"hooks"  value:{string_value:"{\"default_remote_logging\":null}"}}  fields:{key:"runner"  value:{string_value:"portable"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:2}}  fields:{key:"beam:option:output_executable_path:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}}  retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b"  logging_endpoint:{url:"host.docker.internal:61977"}  artifact_endpoint:{url:"host.docker.internal:61979"}  control_endpoint:{url:"host.docker.internal:61975"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\n\xec\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/2-0:pAOCMJEhVBbeam:env:dock-beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjo"  role_urn:"beam:artifact:role:staging_to:v1"  role_payload:"\nZbeam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar"}  runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:44 2025/10/16 14:53:44 Downloaded: /tmp/1-1/staged/beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar (sha256: 0e53f1ab978f19568ee0c7e99a49f735138e95b363a2beea48a940174d8e043f, size: 387766088)
2025-10-16 20:23:45 Running JvmInitializer#onStartup for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 Completed JvmInitializer#onStartup for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 SDK Fn Harness started
2025-10-16 20:23:45 Harness ID 1-1
2025-10-16 20:23:45 Logging location url:"host.docker.internal:61977"
2025-10-16 20:23:45 Control location url:"host.docker.internal:61975"
2025-10-16 20:23:45 Status location null
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json exists. Overriding existing options.
2025-10-16 20:23:45 Pipeline options {"beam:option:app_name:v1":"go-job-1-1760626399474061000", "beam:option:experiments:v1":["beam_fn_api"], "beam:option:flink_conf_dir:v1":null, "beam:option:flink_master:v1":"[auto]", "beam:option:go_options:v1":{"options":{"endpoint":"localhost:8099", "environment_config":"apache/beam_go_sdk:2.68.0", "environment_type":"DOCKER", "expansion_addr":"localhost:8088", "hookOrder":"[\"default_remote_logging\"]", "hooks":"{\"default_remote_logging\":null}", "runner":"portable"}}, "beam:option:job_name:v1":"go0job0101760626399474061000-kishan-1016145325-78a374a3", "beam:option:options_id:v1":2, "beam:option:output_executable_path:v1":null, "beam:option:parallelism:v1":-1, "beam:option:retain_docker_containers:v1":false, "beam:option:runner:v1":null}

(but job continues normally)

Is KafkaIO fully supported for Go SDK cross-language pipelines in Beam 2.68.0?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions