Skip to content

Metadata is not committed after Kafka cluster recreation (parquiet files are written successfully) #15293

@rolxdaytona

Description

@rolxdaytona

Apache Iceberg version

1.9.2

Query engine

Kafka Connect

Please describe the bug 🐞

After a full Kafka cluster reinitialization (old cluster deleted, new cluster created), the Apache Iceberg Kafka Connect sink stopped committing metadata.

Producers are functioning correctly and continuously writing data to the topic.
parquiet files are successfully created and stored.
However, Iceberg metadata commits are not happening.

Kafka Connect logs repeatedly contain:

2026-02-09 18:16:22,285 INFO [etl-kafkaconnector-iceberg-sink|task-0] Table loaded by catalog: iceberg.etl.qa.job_versions (org.apache.iceberg.BaseMetastoreCatalog) [iceberg-committer-3]
2026-02-09 18:16:22,286 INFO [etl-kafkaconnector-iceberg-sink|task-0] Nothing to commit to table etl.qa.job_versions, skipping (org.apache.iceberg.connect.channel.Coordinator) [iceberg-committer-3]

There are also recovery-related log entries:

2026-02-09 18:16:26,828 INFO [etl-kafkaconnector-iceberg-sink|task-0] Handled event of type: DATA_WRITTEN (org.apache.iceberg.connect.channel.Channel) [iceberg-coord]
2026-02-09 18:16:26,830 WARN [etl-kafkaconnector-iceberg-sink|task-0] Received commit response when no commit in progress, this can happen during recovery. Commit ID: 28e687e7-b97a-42d9-ad16-887729e56669 (org.apache.iceberg.connect.channel.CommitState) [iceberg-coord]

Environment:

  • Kafka version: 3.5.1 (strimzi operator driven)
  • Kafka Connect version: 1.9.2
  • Deployment: Kubernetes v1.24.6
  • Connector type: Iceberg Sink Connector
  • Storage: minio 2023-03-13T19:46:17Z (GNU AGPL v3)
  • Catalog type: jdbc
  • Postgres version: 15 (zalando operator driven)

What changed:
The old Kafka cluster was fully deleted and replaced with a new one.

The only configuration change was updating Kafka bootstrap servers in Kafka Connect.

No other changes were made:

  • topic regex unchanged
  • filters unchanged
  • sink connector configuration unchanged
  • producers unchanged
  • topic structure unchanged

When trying to fix the data flow connectivity:

  • Kafka Connect was fully redeployed.
  • All related consumer groups were deleted.
  • All status,config,control topics were deleted.

Current behavior:

  • Producers continuously write data
  • Kafka Connect consumer group subscribes successfully
  • Consumer lag drops to zero
  • Parquiet files are created
  • Iceberg metadata is not committed
  • Logs show: nothing to commit
  • No new table commits appear

Expected behavior:

When incoming data exists and parquiet files are created, Iceberg metadata commits should occur.

Additional debugging performed:

  • Kafka Connect fully redeployed
  • All related internal topics and consumer groups removed
  • Storage accessibility verified
  • Produecer data flow verified
  • No infrastructure-level errors detected in Kubernetes
  • Tried changing the iceberg.kafka.heartbeat.interval.ms, iceberg.kafka.session.timeout.ms, iceberg.control.commit.interval-ms, iceberg.control.commit.timeout-ms, errors.tolerance, iceberg.hadoop.fs.s3a.fast.upload

Environment manifests:

Postgres configuration:


apiVersion: acid.zalan.do/v1
kind: postgresql
metadata:
  labels:
    argocd.argoproj.io/instance: data-lake
    k8slens-edit-resource-version: v1
    team: *hidden*
  name: etl-iceberg-catalog-db
  namespace: data-lake
status:
  PostgresClusterStatus: Running
spec:
  databases:
    iceberg_catalog: *hidden*
  enableLogicalBackup: true
  numberOfInstances: 1
  postgresql:
    parameters:
      wal_level: logical
    version: '15'
  resources:
    limits:
      cpu: '4'
      memory: 2048Mi
    requests:
      cpu: 50m
      memory: 160Mi
  teamId: *hidden*
  users:
    *hidden*: []
  volume:
    size: 8Gi

Kafka configuration:


apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  annotations:
  labels:
    argocd.argoproj.io/instance: data-lake
  name: etl-kafka
  namespace: data-lake
status:
  clusterId: *hidden*
  conditions:
    - lastTransitionTime: '2026-02-09T07:59:57.875668452Z'
      status: 'True'
      type: Ready
  kafkaVersion: 3.5.1
  listeners:
    - addresses:
        - host: etl-kafka-kafka-bootstrap.data-lake.svc
          port: 9092
      bootstrapServers: etl-kafka-kafka-bootstrap.data-lake.svc:9092
      name: plain
      type: plain
    - addresses:
        - host: *hidden*
          port: 9094
      bootstrapServers: *hidden*
      name: external
      type: external
    - addresses:
        - host: *hidden*
          port: 31052
      bootstrapServers: *hidden*
      name: externalnp
      type: externalnp
  observedGeneration: 2
  operatorLastSuccessfulVersion: 0.37.0
spec:
  entityOperator:
    template:
      topicOperatorContainer:
        env:
          - name: TZ
            value: Europe/Moscow
      userOperatorContainer:
        env:
          - name: TZ
            value: Europe/Moscow
    topicOperator:
      watchedNamespace: data-lake
    userOperator:
      watchedNamespace: data-lake
  kafka:
    config:
      auto.create.topics.enable: true
      default.replication.factor: 1
      log.retention.bytes: 104857600
      log.retention.hours: 720
      max.request.size: 5242880
      message.max.bytes: 5242880
      min.insync.replicas: 1
      offsets.retention.minutes: 525600
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - configuration:
          bootstrap:
            annotations:
              metallb.universe.tf/address-pool: private
          brokers:
            - annotations:
                metallb.universe.tf/address-pool: private
              broker: 0
            - annotations:
                metallb.universe.tf/address-pool: private
              broker: 1
            - annotations:
                metallb.universe.tf/address-pool: private
              broker: 2
        name: external
        port: 9094
        tls: false
        type: loadbalancer
      - name: externalnp
        port: 9095
        tls: false
        type: nodeport
    replicas: 1
    resources:
      requests:
        cpu: 150m
        memory: 8000Mi
    storage:
      size: 10Gi
      type: persistent-claim
    template:
      kafkaContainer:
        env:
          - name: TZ
            value: Europe/Moscow
  zookeeper:
    replicas: 3
    resources:
      requests:
        cpu: 50m
        memory: 1000Mi
    storage:
      size: 64Mi
      type: persistent-claim
    template:
      zookeeperContainer:
        env:
          - name: TZ
            value: Europe/Moscow

KafkaConnect configuration:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  labels:
    argocd.argoproj.io/instance: data-lake
  name: etl-kafkaconnect-iceberg
  namespace: data-lake
status:
  conditions:
    - lastTransitionTime: '2026-02-09T19:30:35.134910638Z'
      status: 'True'
      type: Ready
  connectorPlugins:
    - class: org.apache.iceberg.connect.IcebergSinkConnector
      type: sink
      version: 1.9.2
    - class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      type: source
      version: 3.5.1
    - class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
      type: source
      version: 3.5.1
    - class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      type: source
      version: 3.5.1
  labelSelector: >-
    strimzi.io/cluster=etl-kafkaconnect-iceberg,strimzi.io/name=etl-kafkaconnect-iceberg-connect,strimzi.io/kind=KafkaConnect
  observedGeneration: 1
  replicas: 1
  url: http://etl-kafkaconnect-iceberg-connect-api.data-lake.svc:8083
spec:
  bootstrapServers: etl-kafka-kafka-bootstrap.data-lake.svc:9092
  build:
    output:
      image: *registry*/*registry*/iceberg-connect:1.9.2
      pushSecret: docker-registry-cred
      type: docker
    plugins:
      - artifacts:
          - artifact: iceberg-sink-connector
            group: *registry*
            repository: *registry*
            type: maven
            version: 1.0.4
          - artifact: postgresql
            group: org.postgresql
            repository: https://repo1.maven.org/maven2/
            type: maven
            version: 42.7.3
        name: iceberg-sink
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    config.storage.replication.factor: -1
    config.storage.topic: etl-kafkaconnect-iceberg-configs
    group.id: etl-kafkaconnect-iceberg
    offset.storage.replication.factor: -1
    offset.storage.topic: etl-kafkaconnect-iceberg-offsets
    status.storage.replication.factor: -1
    status.storage.topic: etl-kafkaconnect-iceberg-status
  replicas: 1
  resources:
    limits:
      cpu: '2'
      memory: 8Gi
  version: 3.5.1

Connector configuration:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  annotations:
  creationTimestamp: '2025-12-11T15:05:20Z'
  generation: 7
  labels:
    argocd.argoproj.io/instance: data-lake
    k8slens-edit-resource-version: v1beta2
    strimzi.io/cluster: etl-kafkaconnect-iceberg
  name: etl-kafkaconnector-iceberg-sink
  namespace: data-lake
status:
  conditions:
    - lastTransitionTime: '2026-02-10T08:38:49.020478296Z'
      status: 'True'
      type: Ready
  connectorStatus:
    connector:
      state: RUNNING
      worker_id: >-
        etl-kafkaconnect-iceberg-connect-0.etl-kafkaconnect-iceberg-connect.data-lake.svc:8083
    name: etl-kafkaconnector-iceberg-sink
    tasks:
      - id: 0
        state: RUNNING
        worker_id: >-
          etl-kafkaconnect-iceberg-connect-0.etl-kafkaconnect-iceberg-connect.data-lake.svc:8083
    type: sink
  observedGeneration: 7
  tasksMax: 1
  topics:
    - etl.cdc.filestorage.test1.public.blob_info
    - etl.cdc.platform.test1.public.app_property
*wrapped*
    - etl.cdc.platform.test1.public.workspace_user
    - etl.clearml.ml_metrics
    - etl.notion.analytics_tasks
    - etl.notion.annotation_tasks
    - etl.platform.jira_metrics
    - etl.qa.job_versions
    - etl.qa.monitoring_metrics
    - etl.scanner.linux_logs
    - etl.scanner.logs
    - etl.scanner.ml_logs
    - etl.scanner.statistics
    - etl.testops.testcase_history_metrics
    - etl.testops.testcase_metrics
spec:
  class: org.apache.iceberg.connect.IcebergSinkConnector
  config:
    consumer.override.auto.offset.reset: earliest
    errors.deadletterqueue.context.headers.enable: true
    errors.deadletterqueue.context.payload.enable: true
    errors.deadletterqueue.topic.name: dlq.etl.iceberg
    errors.deadletterqueue.topic.partitions: 1
    errors.deadletterqueue.topic.replication.factor: 1
    errors.log.enable: true
    errors.log.include.messages: true
    errors.tolerance: none
    iceberg.catalog: iceberg
    iceberg.catalog.jdbc.driver: org.postgresql.Driver
    iceberg.catalog.jdbc.password: >- *secret link*
    iceberg.catalog.jdbc.user: >- *secret link*
    iceberg.catalog.type: jdbc
    iceberg.catalog.uri: jdbc:postgresql://etl-iceberg-catalog-db:5432/iceberg_catalog
    iceberg.catalog.warehouse: s3a://iceberg-etl/warehouse3
    iceberg.control.commit.interval-ms: 20000
    iceberg.control.commit.timeout-ms: 10000
    iceberg.hadoop.fs.s3a.access.key: iceberg-etl
    iceberg.hadoop.fs.s3a.buffer.dir: /tmp
    iceberg.hadoop.fs.s3a.endpoint: http://minio.minio.svc
    iceberg.hadoop.fs.s3a.endpoint.region: us-east-1
    iceberg.hadoop.fs.s3a.fast.upload: false
    iceberg.hadoop.fs.s3a.fast.upload.buffer: bytebuffer
    iceberg.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    iceberg.hadoop.fs.s3a.path.style.access: 'true'
    iceberg.hadoop.fs.s3a.secret.key: hidden
    iceberg.kafka.heartbeat.interval.ms: 30000
    iceberg.kafka.session.timeout.ms: 240000
    iceberg.tables.auto-create-enabled: true
    iceberg.tables.auto-create-props.format-version: 3
    iceberg.tables.dynamic-enabled: true
    iceberg.tables.evolve-schema-enabled: true
    iceberg.tables.route-field: _topic
    key.converter: org.apache.kafka.connect.storage.StringConverter
    topics.regex: ^(?!.*(phe_check|qrtz_check|windows_events))etl\..*
    transforms: AddTopicField,AddKafkaTimestamp
    transforms.AddKafkaTimestamp.timestamp.field: kafka_timestamp
    transforms.AddKafkaTimestamp.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.AddTopicField.topic.field: _topic
    transforms.AddTopicField.type: org.apache.kafka.connect.transforms.InsertField$Value
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
  tasksMax: 1

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions