Add Kafka SSL/SASL support to event-gateway and restructure Websub#1880
Add Kafka SSL/SASL support to event-gateway and restructure Websub#1880senthuran16 wants to merge 8 commits intomainfrom
Conversation
Add Kafka SSL/SASL support to event-gateway and restructure Websub
📝 WalkthroughOverviewThis PR adds TLS/SASL support for Kafka connectivity and restructures WebSub handling in event-gateway to use a unified broker-driver abstraction. The changes enable the runtime to connect to secured Kafka clusters and improve configuration management across multiple Kafka client paths. Key ChangesKafka Connection Configuration
Broker-Driver Enhancements
WebSub Restructuring
Development Environment
xDS Integration
Testing
ImpactThe changes enable event-gateway to operate against secured Kafka deployments while improving code maintainability through centralized connection configuration and consistent use of the broker-driver abstraction across all Kafka client paths. WalkthroughThis pull request adds Kafka TLS and SASL authentication support to the event gateway. Changes include local certificate generation via OpenSSL/Keytool, updated configuration schemas with TLS file paths and SASL credentials, and a refactored Kafka connectivity layer using a centralized Sequence DiagramsequenceDiagram
participant EG as Event Gateway<br/>(Startup)
participant Cfg as Configuration<br/>Loader
participant CC as ConnectionConfig<br/>Builder
participant TLS as TLS Config<br/>Builder
participant Kafka as Kafka Broker
participant BD as BrokerDriver
EG->>Cfg: Load Kafka settings<br/>(brokers, TLS files, SASL)
Cfg->>CC: ResolveConnectionConfig()<br/>(global + overrides)
CC->>TLS: buildTLSConfig()<br/>(CA, cert, key, serverName)
TLS->>TLS: Read cert files<br/>& build tls.Config
TLS-->>CC: tls.Config
CC->>CC: Validate brokers,<br/>TLS, SASL
CC-->>EG: ConnectionConfig
EG->>BD: NewBrokerDriver(cfg)
BD->>BD: BuildClientOptions()<br/>(brokers, TLS, SASL)
BD->>Kafka: kgo.NewClient()<br/>(with TLS/SASL options)
Kafka-->>BD: Connected
BD-->>EG: BrokerDriver ready
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.1)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain modules listed in go.work or their selected dependencies" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
event-gateway/docker-compose.dev.yaml (1)
167-167: ⚡ Quick winPin the
kafka-uiimage to a specific version.Using
provectuslabs/kafka-ui:latestmakes the dev compose stack non-reproducible — a future image push can change behavior or break the SASL/SSL property names without any code change. Pinning a specific tag aligns with the rest of the stack (bitnamilegacy/kafka:4.0.0-debian-12-r10).Suggested change
- image: provectuslabs/kafka-ui:latest + image: provectuslabs/kafka-ui:v0.7.2Use whichever tag has been verified locally; the key point is to avoid
:latest.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/docker-compose.dev.yaml` at line 167, Replace the unpinned Kafka UI image reference "provectuslabs/kafka-ui:latest" with a specific validated tag (e.g., the version you verified locally) to make the dev compose stack reproducible; update the "image: provectuslabs/kafka-ui:latest" entry to use that explicit tag (matching the practice used for "bitnamilegacy/kafka:4.0.0-debian-12-r10") rather than :latest.event-gateway/gateway-runtime/internal/subscription/sync.go (1)
96-98: ⚡ Quick winUpdate the
Closecontract to match the new ownership model.Line 96 still says this flushes and closes producer resources, but the method is now a no-op because
SyncProducerno longer owns the Kafka client lifecycle. Please update or remove the comment so callers do not rely on cleanup that no longer happens.Proposed change
-// Close flushes any buffered records and closes the sync producer. +// Close is a no-op. SyncProducer uses the shared broker driver and does not own its lifecycle. func (p *SyncProducer) Close() { }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/subscription/sync.go` around lines 96 - 98, The Close method comment on SyncProducer is stale: update the docstring for func (p *SyncProducer) Close() to state that it is a no-op and does not flush or close Kafka client resources (SyncProducer no longer owns the client lifecycle) so callers don't expect cleanup; alternatively remove the comment entirely or add a brief note pointing callers to the owner of the Kafka client lifecycle (reference SyncProducer.Close and SyncProducer type) so the contract accurately reflects current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`:
- Around line 126-128: The code in endpoint.go currently calls
e.admin.CreateTopics(ctx, 1, 1, ..., topic) which hardcodes 1 partition and 1
replica; change this to read partition and replication factors from
configuration or a shared topic-creation policy and pass those variables into
e.admin.CreateTopics instead of literals. Locate the call to
e.admin.CreateTopics (and the surrounding function that creates the internal
compacted topic), add or inject config values (e.g., compactTopicPartitions and
compactTopicReplication or a TopicCreationPolicy struct) with sensible defaults
and validation, and use those variables in place of the hardcoded 1,1 so topic
creation honors cluster policies and is configurable.
---
Nitpick comments:
In `@event-gateway/docker-compose.dev.yaml`:
- Line 167: Replace the unpinned Kafka UI image reference
"provectuslabs/kafka-ui:latest" with a specific validated tag (e.g., the version
you verified locally) to make the dev compose stack reproducible; update the
"image: provectuslabs/kafka-ui:latest" entry to use that explicit tag (matching
the practice used for "bitnamilegacy/kafka:4.0.0-debian-12-r10") rather than
:latest.
In `@event-gateway/gateway-runtime/internal/subscription/sync.go`:
- Around line 96-98: The Close method comment on SyncProducer is stale: update
the docstring for func (p *SyncProducer) Close() to state that it is a no-op and
does not flush or close Kafka client resources (SyncProducer no longer owns the
client lifecycle) so callers don't expect cleanup; alternatively remove the
comment entirely or add a brief note pointing callers to the owner of the Kafka
client lifecycle (reference SyncProducer.Close and SyncProducer type) so the
contract accurately reflects current behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a0c7ef43-a4a0-44c1-945a-b3532097bf39
📒 Files selected for processing (19)
event-gateway/.env.exampleevent-gateway/docker-compose.dev.yamlevent-gateway/docker/kafka/generate-certs.shevent-gateway/gateway-runtime/cmd/event-gateway/main.goevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/config/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/subscription/reconciler.goevent-gateway/gateway-runtime/internal/subscription/sync.goevent-gateway/gateway-runtime/internal/xdsclient/handler.go
Purpose
event-gateway could not connect to secured Kafka clusters in a complete way. Kafka TLS/SASL settings existed at the runtime config level, but they were not fully wired through the Kafka broker-driver path, and some WebSub
subscription state flows still constructed Kafka clients directly. In addition, the local dev stack only supported plaintext Kafka, so secured connectivity could not be validated end-to-end in the repo.
Resolves #1861
Goals
Approach
User stories
Documentation
N/A — this change is implementation and local-dev-stack focused; product documentation was not updated in this PR.
Automation tests
go test ./... passed in event-gateway/gateway-runtime.
Added focused tests for Kafka connection config resolution and validation.
No dedicated automated integration test suite was added in this PR.
Performed compose-based smoke validation for the secured Kafka services and Kafka UI connectivity.
Security checks
Samples
Related PRs
#1879
Test environment