Skip to content

Add Kafka SSL/SASL support to event-gateway and restructure Websub#1880

Open
senthuran16 wants to merge 8 commits intomainfrom
websub-kafka
Open

Add Kafka SSL/SASL support to event-gateway and restructure Websub#1880
senthuran16 wants to merge 8 commits intomainfrom
websub-kafka

Conversation

@senthuran16
Copy link
Copy Markdown
Member

@senthuran16 senthuran16 commented May 5, 2026

Purpose

event-gateway could not connect to secured Kafka clusters in a complete way. Kafka TLS/SASL settings existed at the runtime config level, but they were not fully wired through the Kafka broker-driver path, and some WebSub
subscription state flows still constructed Kafka clients directly. In addition, the local dev stack only supported plaintext Kafka, so secured connectivity could not be validated end-to-end in the repo.

Resolves #1861

Goals

  • Enable Kafka TLS/SASL connectivity in event-gateway.
  • Keep receiver and broker-driver responsibilities properly separated.
  • Make WebSub subscription sync and replay use broker-driver capabilities instead of direct Kafka client construction.
  • Provide a secured local Kafka path in event-gateway/docker-compose.dev.yaml for validation.

Approach

  • Added resolved Kafka connection config handling for brokers, TLS, CA/client certs, server name, and SASL credentials/mechanisms.
  • Reused the resolved Kafka config across producer, admin, shared consumer, manual-commit consumer, and replay paths in the Kafka broker-driver.
  • Extended broker-driver capabilities to support manual subscribe, replay, and compacted-topic creation needed by WebSub subscription state handling.
  • Refactored WebSub subscription sync/reconcile paths to go through the broker-driver instead of Kafka-specific constructors.
  • Updated xDS fallback handling so dynamically deployed bindings inherit the runtime Kafka security defaults when broker config is omitted.
  • Added a secured dev Kafka compose flow with generated certs, SASL/SSL listener config, and matching Kafka UI trust/auth settings.

User stories

  • As an event-gateway operator, I can connect the runtime to secured Kafka clusters using TLS and SASL.
  • As a developer, I can validate secured Kafka connectivity locally using the dev compose stack.
  • As a maintainer, I can keep WebSub receiver code broker-driver agnostic while preserving subscription sync/replay behavior.

Documentation

N/A — this change is implementation and local-dev-stack focused; product documentation was not updated in this PR.

Automation tests

  • Unit tests
    go test ./... passed in event-gateway/gateway-runtime.
    Added focused tests for Kafka connection config resolution and validation.
  • Integration tests
    No dedicated automated integration test suite was added in this PR.
    Performed compose-based smoke validation for the secured Kafka services and Kafka UI connectivity.

Security checks

Samples

  • Updated the local dev compose setup to include a secured Kafka listener, generated local cert material, and Kafka UI connectivity for smoke validation.
  • No product sample artifacts were added.

Related PRs

#1879

Test environment

  • OS: Ubuntu 24.04.4 LTS
  • Go runtime tests: local Linux environment
  • Containers: Docker Compose local environment
  • Kafka smoke validation: secured local Kafka listener with Kafka UI
  • Browsers/DB/JDK: N/A for this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 5, 2026

📝 Walkthrough

Overview

This PR adds TLS/SASL support for Kafka connectivity and restructures WebSub handling in event-gateway to use a unified broker-driver abstraction. The changes enable the runtime to connect to secured Kafka clusters and improve configuration management across multiple Kafka client paths.

Key Changes

Kafka Connection Configuration

  • Introduced a new ConnectionConfig structure that encapsulates all Kafka connection settings (brokers, TLS parameters, SASL credentials)
  • Added ResolveConnectionConfig function to merge global Kafka configuration with per-binding overrides
  • Implemented BuildClientOptions to generate franz-go client options from connection config, handling TLS certificate loading and SASL mechanism setup
  • Extended runtime configuration to support TLS settings (tls_ca_file, tls_cert_file, tls_key_file, tls_server_name) and SASL parameters (sasl_mechanism, sasl_username, sasl_password)
  • Added validation for Kafka configuration including TLS file readability and SASL credential requirements

Broker-Driver Enhancements

  • Updated KafkaBrokerDriver to use ConnectionConfig instead of raw broker lists
  • Extended BrokerDriver interface with three new capabilities:
    • SubscribeManual: manual consumer subscription
    • Replay: topic replay functionality
    • EnsureCompactedTopic: compacted topic creation and verification
  • Modified producer, consumer, and publisher implementations to use connection config

WebSub Restructuring

  • Refactored SyncProducer and SyncConsumer to delegate Kafka operations to BrokerDriver
  • Updated ConsumerManager to use broker-driver's SubscribeManual instead of direct Kafka client instantiation
  • Modified Reconciler to use broker-driver's Replay for topic replay operations
  • Removed direct Kafka client dependencies from WebSub subscription sync/reconcile paths

Development Environment

  • Added secured Kafka setup to docker-compose.dev.yaml with TLS and SASL support
  • Created certificate generation script (generate-certs.sh) for local development with self-signed TLS artifacts
  • Added new environment variables for Kafka credentials in .env.example
  • Configured Kafka UI service with appropriate trust settings for secured connections

xDS Integration

  • Updated xDS handler's KafkaConfig to include TLS and SASL settings
  • Modified resolveBrokerDriver to propagate runtime Kafka security defaults to dynamically deployed bindings when broker configuration is omitted

Testing

  • Added unit tests for ResolveConnectionConfig covering config merging, credential preservation, TLS validation, and SASL requirements
  • Verified connection config resolution and validation through focused test suite

Impact

The changes enable event-gateway to operate against secured Kafka deployments while improving code maintainability through centralized connection configuration and consistent use of the broker-driver abstraction across all Kafka client paths.

Walkthrough

This pull request adds Kafka TLS and SASL authentication support to the event gateway. Changes include local certificate generation via OpenSSL/Keytool, updated configuration schemas with TLS file paths and SASL credentials, and a refactored Kafka connectivity layer using a centralized ConnectionConfig abstraction. The broker driver interface is extended with manual subscription, topic replay, and compacted topic management capabilities. Docker Compose infrastructure is updated to generate certificates, configure the Kafka broker with SASL_SSL, and include Kafka UI for development debugging.

Sequence Diagram

sequenceDiagram
    participant EG as Event Gateway<br/>(Startup)
    participant Cfg as Configuration<br/>Loader
    participant CC as ConnectionConfig<br/>Builder
    participant TLS as TLS Config<br/>Builder
    participant Kafka as Kafka Broker
    participant BD as BrokerDriver

    EG->>Cfg: Load Kafka settings<br/>(brokers, TLS files, SASL)
    Cfg->>CC: ResolveConnectionConfig()<br/>(global + overrides)
    CC->>TLS: buildTLSConfig()<br/>(CA, cert, key, serverName)
    TLS->>TLS: Read cert files<br/>& build tls.Config
    TLS-->>CC: tls.Config
    CC->>CC: Validate brokers,<br/>TLS, SASL
    CC-->>EG: ConnectionConfig
    EG->>BD: NewBrokerDriver(cfg)
    BD->>BD: BuildClientOptions()<br/>(brokers, TLS, SASL)
    BD->>Kafka: kgo.NewClient()<br/>(with TLS/SASL options)
    Kafka-->>BD: Connected
    BD-->>EG: BrokerDriver ready
Loading
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.42% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title concisely and directly summarizes the two main objectives: adding Kafka SSL/SASL support and restructuring WebSub, matching the core changes throughout the PR.
Description check ✅ Passed The description provides comprehensive coverage of Purpose, Goals, Approach, User Stories, Documentation, Automation Tests, Security Checks, Samples, Related PRs, and Test Environment, fully aligned with the repository template.
Linked Issues check ✅ Passed The PR fully implements Kafka SSL/SASL support requested in issue #1861 through connection config resolution, broker-driver integration, TLS/SASL validation, certificate generation for dev environment, and unit tests validating the implementation.
Out of Scope Changes check ✅ Passed All changes directly support Kafka SSL/SASL connectivity and WebSub restructuring: environment configuration, certificate generation, connection config handling, broker-driver extensions, WebSub refactoring, and dev stack updates align with stated objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch websub-kafka

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.12.1)

level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain modules listed in go.work or their selected dependencies"


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@senthuran16
Copy link
Copy Markdown
Member Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 5, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
event-gateway/docker-compose.dev.yaml (1)

167-167: ⚡ Quick win

Pin the kafka-ui image to a specific version.

Using provectuslabs/kafka-ui:latest makes the dev compose stack non-reproducible — a future image push can change behavior or break the SASL/SSL property names without any code change. Pinning a specific tag aligns with the rest of the stack (bitnamilegacy/kafka:4.0.0-debian-12-r10).

Suggested change
-    image: provectuslabs/kafka-ui:latest
+    image: provectuslabs/kafka-ui:v0.7.2

Use whichever tag has been verified locally; the key point is to avoid :latest.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/docker-compose.dev.yaml` at line 167, Replace the unpinned
Kafka UI image reference "provectuslabs/kafka-ui:latest" with a specific
validated tag (e.g., the version you verified locally) to make the dev compose
stack reproducible; update the "image: provectuslabs/kafka-ui:latest" entry to
use that explicit tag (matching the practice used for
"bitnamilegacy/kafka:4.0.0-debian-12-r10") rather than :latest.
event-gateway/gateway-runtime/internal/subscription/sync.go (1)

96-98: ⚡ Quick win

Update the Close contract to match the new ownership model.

Line 96 still says this flushes and closes producer resources, but the method is now a no-op because SyncProducer no longer owns the Kafka client lifecycle. Please update or remove the comment so callers do not rely on cleanup that no longer happens.

Proposed change
-// Close flushes any buffered records and closes the sync producer.
+// Close is a no-op. SyncProducer uses the shared broker driver and does not own its lifecycle.
 func (p *SyncProducer) Close() {
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/subscription/sync.go` around lines 96
- 98, The Close method comment on SyncProducer is stale: update the docstring
for func (p *SyncProducer) Close() to state that it is a no-op and does not
flush or close Kafka client resources (SyncProducer no longer owns the client
lifecycle) so callers don't expect cleanup; alternatively remove the comment
entirely or add a brief note pointing callers to the owner of the Kafka client
lifecycle (reference SyncProducer.Close and SyncProducer type) so the contract
accurately reflects current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`:
- Around line 126-128: The code in endpoint.go currently calls
e.admin.CreateTopics(ctx, 1, 1, ..., topic) which hardcodes 1 partition and 1
replica; change this to read partition and replication factors from
configuration or a shared topic-creation policy and pass those variables into
e.admin.CreateTopics instead of literals. Locate the call to
e.admin.CreateTopics (and the surrounding function that creates the internal
compacted topic), add or inject config values (e.g., compactTopicPartitions and
compactTopicReplication or a TopicCreationPolicy struct) with sensible defaults
and validation, and use those variables in place of the hardcoded 1,1 so topic
creation honors cluster policies and is configurable.

---

Nitpick comments:
In `@event-gateway/docker-compose.dev.yaml`:
- Line 167: Replace the unpinned Kafka UI image reference
"provectuslabs/kafka-ui:latest" with a specific validated tag (e.g., the version
you verified locally) to make the dev compose stack reproducible; update the
"image: provectuslabs/kafka-ui:latest" entry to use that explicit tag (matching
the practice used for "bitnamilegacy/kafka:4.0.0-debian-12-r10") rather than
:latest.

In `@event-gateway/gateway-runtime/internal/subscription/sync.go`:
- Around line 96-98: The Close method comment on SyncProducer is stale: update
the docstring for func (p *SyncProducer) Close() to state that it is a no-op and
does not flush or close Kafka client resources (SyncProducer no longer owns the
client lifecycle) so callers don't expect cleanup; alternatively remove the
comment entirely or add a brief note pointing callers to the owner of the Kafka
client lifecycle (reference SyncProducer.Close and SyncProducer type) so the
contract accurately reflects current behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a0c7ef43-a4a0-44c1-945a-b3532097bf39

📥 Commits

Reviewing files that changed from the base of the PR and between 0508cfb and 6e1b505.

📒 Files selected for processing (19)
  • event-gateway/.env.example
  • event-gateway/docker-compose.dev.yaml
  • event-gateway/docker/kafka/generate-certs.sh
  • event-gateway/gateway-runtime/cmd/event-gateway/main.go
  • event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/internal/config/config.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go
  • event-gateway/gateway-runtime/internal/connectors/types.go
  • event-gateway/gateway-runtime/internal/subscription/reconciler.go
  • event-gateway/gateway-runtime/internal/subscription/sync.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Support Kafka SSL and authenticated server

2 participants