-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
I’m trying to run a Go → Kafka → Go (KafkaIO) proof of concept using Apache Beam Go SDK v2.68 with the Flink portable runner.
The job starts successfully and launches two SDK harnesses (Go and Java), but the KafkaIO read transform never emits elements — the job stays idle and periodically logs “operation ongoing in bundle … without outputting or completing.”
Messages published to the input topic are not seen in the Go ParDo or written to the output topic.
Environment
Component - Version / Setup
Beam Go SDK - 2.68.0
Flink Runner - 1.19 (MiniCluster via beam-runners-flink-1.19-job-server-2.68.0.jar)
Expansion Service - beam-sdks-java-io-expansion-service-2.68.0.jar
Kafka - v4.0.0 / Local (localhost:9092)
Harness environment - Docker (apache/beam_go_sdk:2.68.0, apache/beam_java17_sdk:2.68.0)
OS - macOS (Docker Desktop)
Runner options used: --runner=portable --endpoint=localhost:8099 --environment_type=DOCKER
-- environment_config=apache/beam_go_sdk:2.68.0 --expansion_addr=localhost:8088
What works
• Go-only pipelines (beam.Create → ParDo(log)) complete successfully.
• Kafka producer and consumer work fine outside Beam.
• Two SDK harnesses start as expected (Go + Java).
• Java harness logs Kafka producer initialization.
What fails
• KafkaIO.Read emits no elements to the Go ParDo.
• No messages appear on the output topic (when KafkaIO.Write is enabled).
• Go DoFn logs (log.Infof) never appear.
• The job remains active with the “bundle ongoing” warnings shown above.
My Pipeline code:
package main
import (
"context"
"flag"
"fmt"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
const (
kafkaBroker = "localhost:9092"
inputTopic = "input"
outputTopic = "output"
)
var expansionAddr = flag.String("expansion_addr", "", "Address of Kafka expansion service")
func logRecord(ctx context.Context, k, v []byte) ([]byte, []byte) {
log.Infof(ctx, "got record key=%q val=%q", string(k), string(v))
return k, v
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()
input := kafkaio.Read(
s,
*expansionAddr,
kafkaBroker,
[]string{inputTopic}
)
beam.ParDo(s, logRecord, input)
kafkaio.Write(s, *expansionAddr, kafkaBroker, outputTopic, input)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Pipeline failed: %v", err)
}
}
Reproduction Steps
1. Start Kafka locally and create input and output topics.
2. Run the Kafka expansion service: java -jar beam-sdks-java-io-expansion-service-2.68.0.jar 8088 --javaClassLookupAllowlistFile='*'
3. Start the Flink job server:java -jar beam-runners-flink-1.19-job-server-2.68.0.jar --job-port=8099 --artifact-port=8098
4. Run the Go pipeline: go run . --runner=portable --endpoint=localhost:8099 --expansion_addr=localhost:8088 --environment_type=DOCKER --environment_config=apache/beam_go_sdk:2.68.0
5. Produce a few messages to kafka input topic
6. Observe no messages logged and no output produced.
Logs
Job server (a part of log):
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: initializing Kafka metrics collector
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka version: 3.9.0
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka commitId: 84caaa6e9da06435
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka startTimeMs: 1760626427960
Oct 16, 2025 8:23:47 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_904017666_none-3, groupId=Reader-0_offset_consumer_904017666_none] Cluster ID: 5L6g3nShT-eMCtK--X86sw
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService getProcessBundleDescriptor
INFO: getProcessBundleDescriptor request with id 1-3
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
WARNING: Failed to parse element_processing_timeout: time: invalid duration "", there will be no timeout for processing an element in a PTransform operation
Oct 16, 2025 8:23:49 PM org.apache.beam.runners.fnexecution.data.GrpcDataService data
INFO: Beam Fn Data client connected.
...
WARNING: Operation ongoing in bundle 1 for at least 05m00s without outputting or completing
Go harness:
2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1" value:{string_value:"go-job-1-1760626399474061000"}} fields:{key:"beam:option:experiments:v1" value:{list_value:{values:{string_value:"beam_fn_api"}}}} fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}} fields:{key:"beam:option:go_options:v1" value:{struct_value:{fields:{key:"options" value:{struct_value:{fields:{key:"endpoint" value:{string_value:"localhost:8099"}} fields:{key:"environment_config" value:{string_value:"apache/beam_go_sdk:2.68.0"}} fields:{key:"environment_type" value:{string_value:"DOCKER"}} fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}} fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}} fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}} fields:{key:"runner" value:{string_value:"portable"}}}}}}}} fields:{key:"beam:option:job_name:v1" value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}} fields:{key:"beam:option:options_id:v1" value:{number_value:2}} fields:{key:"beam:option:output_executable_path:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1" value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1" value:{bool_value:false}} fields:{key:"beam:option:runner:v1" value:{null_value:NULL_VALUE}}} retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b" logging_endpoint:{url:"host.docker.internal:61976"} artifact_endpoint:{url:"host.docker.internal:61978"} control_endpoint:{url:"host.docker.internal:61974"} dependencies:{type_urn:"beam:artifact:type:file:v1" type_payload:"\n\xdc\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/1-0:go-/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/worker-1-1760626399474073000" role_urn:"beam:artifact:role:go_worker_binary:v1"} runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:40 2025/10/16 14:53:40 Downloaded: /tmp/staged/1-worker-1-1760626399474073000 (sha256: 4e8c52729f472480c02833507b225ea334663bea8426cd623e2afc6993ed630f, size: 67584651)
2025-10-16 20:23:43 Error Setting Rlimit operation not permitted
Java Harness:
2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1" value:{string_value:"go-job-1-1760626399474061000"}} fields:{key:"beam:option:experiments:v1" value:{list_value:{values:{string_value:"beam_fn_api"}}}} fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}} fields:{key:"beam:option:go_options:v1" value:{struct_value:{fields:{key:"options" value:{struct_value:{fields:{key:"endpoint" value:{string_value:"localhost:8099"}} fields:{key:"environment_config" value:{string_value:"apache/beam_go_sdk:2.68.0"}} fields:{key:"environment_type" value:{string_value:"DOCKER"}} fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}} fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}} fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}} fields:{key:"runner" value:{string_value:"portable"}}}}}}}} fields:{key:"beam:option:job_name:v1" value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}} fields:{key:"beam:option:options_id:v1" value:{number_value:2}} fields:{key:"beam:option:output_executable_path:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1" value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1" value:{bool_value:false}} fields:{key:"beam:option:runner:v1" value:{null_value:NULL_VALUE}}} retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b" logging_endpoint:{url:"host.docker.internal:61977"} artifact_endpoint:{url:"host.docker.internal:61979"} control_endpoint:{url:"host.docker.internal:61975"} dependencies:{type_urn:"beam:artifact:type:file:v1" type_payload:"\n\xec\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/2-0:pAOCMJEhVBbeam:env:dock-beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjo" role_urn:"beam:artifact:role:staging_to:v1" role_payload:"\nZbeam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar"} runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:44 2025/10/16 14:53:44 Downloaded: /tmp/1-1/staged/beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar (sha256: 0e53f1ab978f19568ee0c7e99a49f735138e95b363a2beea48a940174d8e043f, size: 387766088)
2025-10-16 20:23:45 Running JvmInitializer#onStartup for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 Completed JvmInitializer#onStartup for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 SDK Fn Harness started
2025-10-16 20:23:45 Harness ID 1-1
2025-10-16 20:23:45 Logging location url:"host.docker.internal:61977"
2025-10-16 20:23:45 Control location url:"host.docker.internal:61975"
2025-10-16 20:23:45 Status location null
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json exists. Overriding existing options.
2025-10-16 20:23:45 Pipeline options {"beam:option:app_name:v1":"go-job-1-1760626399474061000", "beam:option:experiments:v1":["beam_fn_api"], "beam:option:flink_conf_dir:v1":null, "beam:option:flink_master:v1":"[auto]", "beam:option:go_options:v1":{"options":{"endpoint":"localhost:8099", "environment_config":"apache/beam_go_sdk:2.68.0", "environment_type":"DOCKER", "expansion_addr":"localhost:8088", "hookOrder":"[\"default_remote_logging\"]", "hooks":"{\"default_remote_logging\":null}", "runner":"portable"}}, "beam:option:job_name:v1":"go0job0101760626399474061000-kishan-1016145325-78a374a3", "beam:option:options_id:v1":2, "beam:option:output_executable_path:v1":null, "beam:option:parallelism:v1":-1, "beam:option:retain_docker_containers:v1":false, "beam:option:runner:v1":null}
(but job continues normally)
Is KafkaIO fully supported for Go SDK cross-language pipelines in Beam 2.68.0?
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner