Add secure Kafka support and WebSub lifecycle fixes (plus topic normalization) for event-gateway#1822
Add secure Kafka support and WebSub lifecycle fixes (plus topic normalization) for event-gateway#1822AnujaKalahara99 wants to merge 10 commits intowso2:mainfrom
Conversation
…ependecy on kafka broker Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <copilot@github.com>
…ice layer Co-authored-by: Copilot <copilot@github.com>
📝 WalkthroughSummaryThis PR implements secure Kafka support, establishes a broker-driver abstraction layer to decouple WebSub from direct Kafka dependencies, introduces WebSub topic normalization, and fixes WebSub lifecycle management in the API gateway. Kafka Security & Configuration
Broker-Driver Abstraction
Topic Normalization
WebSub Gateway Handler Refactoring
WebSub API Service Layer
WebSub Lifecycle & Orphan Cleanup
Testing Additions
WalkthroughThis pull request introduces a unified Kafka configuration system with TLS and SASL support, refactors topic name handling to normalize special characters, and abstracts broker-specific dependencies behind a 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain modules listed in go.work or their selected dependencies" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go (1)
101-127:⚠️ Potential issue | 🟠 Major
EnsureStateTopicsdoes not correct existing topic configs.Lines 102-106 only pass
cleanup.policy=compactduring topic creation, and Lines 117-120 treatTOPIC_ALREADY_EXISTSas success. If a state topic already exists without compaction, this method leaves it unchanged, so it does not actually ensure state-topic semantics after an upgrade.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go` around lines 101 - 127, EnsureStateTopics only sets cleanup.policy=compact on creation and treats existing topics as success, so topics already existing without compaction are not corrected; update createTopics/EnsureStateTopics to, for each topic that already exists (detected via isTopicAlreadyExistsErr or by querying the admin), fetch its current config and if cleanup.policy != "compact" call the admin config-alter API to set cleanup.policy="compact" (use the Kafka admin client's config/AlterConfigs or equivalent in the admin field) before returning success; keep existing logging (slog.Debug/slog.Info) and ensure errors from the config-update path are returned similarly to creation errors.gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go (1)
195-212:⚠️ Potential issue | 🟠 MajorDo not collapse service/storage failures into the generic 400 response.
After moving the update flow behind
webSubAPIService.Update, server-side failures fromgateway/gateway-controller/pkg/service/websubapi/service.go:122-128now fall through to this branch and are returned as400 Bad Request. Please reserve 400 for request/validation errors and map storage/runtime failures to a 5xx response instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go` around lines 195 - 212, The handler currently returns 400 for all non-not-found/conflict errors from webSubAPIService.Update; change the fallback so service/storage/runtime failures (i.e., any error that is not errors.Is(err, websubapi.ErrNotFound) and not storage.IsConflictError(err)) return a 5xx (use http.StatusInternalServerError) with the error message and proper logging; update the error handling block in websub_api_handler.go (the branch handling err from webSubAPIService.Update) to map validation/consumer errors to 400 only, keep ErrNotFound -> 404 and storage.IsConflictError -> 409, and return 500 for all other service/storage/runtime errors.
🧹 Nitpick comments (7)
event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf (1)
3-6: Avoid committing concrete credential values in shared JAAS defaults.Line 3-Line 6 hard-code broker/user passwords. Prefer a local override mechanism (or untracked template) so repository defaults don’t carry reusable credentials.
As per coding guidelines: “validate safety without exposing sensitive context.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf` around lines 3 - 6, Replace the hard-coded credential values in the JAAS defaults (the entries username, password, user_broker, user_egw) with non-sensitive placeholders or references to a local/untracked override (e.g., template variables or an env-based lookup) and add/update an untracked template file (e.g., kafka_server_jaas.conf.template) plus a note in README explaining how to populate the real credentials locally; ensure the production/CI loader uses the real secret source (env/secret manager) rather than the committed file and remove any concrete passwords from the committed kafka_server_jaas.conf.event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties (1)
15-18: Externalize keystore/truststore password values from committed config defaults.Even for dev configs, avoid fixed password literals in tracked files; use environment-driven or local override values.
As per coding guidelines: “validate safety without exposing sensitive context.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties` around lines 15 - 18, The file contains hardcoded keystore/truststore passwords (ssl.keystore.password, ssl.key.password, ssl.truststore.password) which must be externalized; replace the literal values with environment-driven placeholders (e.g. use ${ENV_SSL_KEYSTORE_PASSWORD:-} or a config templating reference) and ensure your application or startup scripts read those env vars or a local override file, validate presence at startup, and remove/rotate any committed secrets; update any README or example env file to show the required variables without embedding real passwords.event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties (1)
1-18: Add an explicit DEV-ONLY header comment in this plaintext profile.A short banner at the top helps prevent accidental reuse of this profile outside local development.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties` around lines 1 - 18, Add a prominent DEV-ONLY banner as a multi-line properties comment (using #) at the very top of this properties file (before the existing process.roles entry) that clearly states this is for local development only, must not be used in production, and warns that this profile uses plaintext/SASL_PLAINTEXT (see keys like listeners and sasl.enabled.mechanisms) so it’s insecure; keep it only as a comment and do not modify any existing property keys or values.event-gateway/gateway-runtime/internal/subscription/sync_test.go (1)
142-152: Align the replay fake with production behavior.This test double returns on the first handler error, while the Kafka replay path logs handler errors and continues. Keeping those semantics aligned will make future reconciler tests exercise the same control flow as runtime code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/gateway-runtime/internal/subscription/sync_test.go` around lines 142 - 152, The test fake recordingBrokerDriver.Replay currently returns immediately on the first handler error; change it to mirror production Kafka replay semantics by invoking handler for every message in r.replayMessages and, on error, log the error and continue to the next message instead of returning; ensure the method still sets r.replayTopic from topics[0] and ultimately returns nil (or an aggregated/non-fatal result consistent with production) so tests exercise the same control flow as the runtime code.event-gateway/gateway-runtime/internal/connectors/types.go (1)
56-56: Narrow or document theReplaycontract.
BrokerDriver.Replayaccepts multiple topics, but the Kafka implementation currently errors unless exactly one topic is passed. That mismatch makes the abstraction easy to misuse. Consider changing this totopic stringor documenting the single-topic constraint on the interface.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/gateway-runtime/internal/connectors/types.go` at line 56, The interface BrokerDriver.Replay currently accepts topics []string but KafkaDriver.Replay only works for a single topic; either change the contract to accept a single topic (replace BrokerDriver.Replay(ctx context.Context, topics []string, handler MessageHandler) error with Replay(ctx context.Context, topic string, handler MessageHandler) error) and update all implementations (e.g., KafkaDriver.Replay) and callers accordingly, or if you must keep multiple topics, add a clear doc comment on BrokerDriver.Replay stating the single-topic constraint and update KafkaDriver.Replay to return a descriptive error when len(topics) != 1; reference BrokerDriver.Replay and KafkaDriver.Replay when making the change.event-gateway/docker-compose.dev.yaml (1)
72-80: Avoid duplicating the SASL client settings across services.These lines repeat the same mechanism and credentials in both
event-gatewayandkafka-ui. Moving them to shared Compose variables or an env file will reduce drift when the dev auth settings change.Also applies to: 149-155
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/docker-compose.dev.yaml` around lines 72 - 80, Remove duplicated SASL client env vars (APIP_EGW_KAFKA_TLS, APIP_EGW_KAFKA_SASL_MECHANISM, APIP_EGW_KAFKA_SASL_USERNAME, APIP_EGW_KAFKA_SASL_PASSWORD) from the individual service blocks (event-gateway and kafka-ui) and centralize them into a single shared source—either an env_file (e.g., .env.dev) referenced by both services or a top-level reusable anchor (x-common-environment) that both event-gateway and kafka-ui include; update the service definitions to reference the shared variables instead of repeating them so credential/mechanism changes are made in one place.gateway/gateway-controller/pkg/service/websubapi/service.go (1)
107-108: Use the service logger as a fallback.
Updatedereferencesparams.Loggerlater, butUpdateParamsdoes not enforce it and the constructor already storess.logger. Falling back avoids a nil panic and makes the injected logger useful.Suggested change
func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { - log := params.Logger + log := params.Logger + if log == nil { + log = s.logger + } + if log == nil { + log = slog.Default() + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@gateway/gateway-controller/pkg/service/websubapi/service.go` around lines 107 - 108, The Update method currently assigns log := params.Logger and later dereferences it, risking a nil panic because UpdateParams.Logger is optional; change Update to use params.Logger if non-nil otherwise fall back to the service-level logger s.logger (i.e., set log to params.Logger when present else s.logger) so all log calls use a safe logger instance; update references in Update to use this resolved log variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@event-gateway/gateway-runtime/configs/config.toml`:
- Around line 31-33: Replace the hard-coded SASL defaults by removing the
concrete values for sasl_mechanism, sasl_username and sasl_password in the
runtime config and leave them blank (e.g., empty strings or omitted) so
credentials must be injected at deployment time; also ensure any code that reads
these keys (the config loader/initializer that references sasl_mechanism,
sasl_username, sasl_password) treats empty values as “not configured” and falls
back to secure injection (environment vars or secret manager) and validates
presence at startup.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 52-86: The loop over overrides currently only handles known keys
and silently ignores unknown ones; update the code in the overrides handling
(the loop that references overrides, parseBrokerList, and sets fields on cfg /
ConnectionConfig like Brokers, TLS, SASLMechanism, SASLUsername, SASLPassword)
to return an error for any unrecognized key instead of ignoring it—after the
switch, add a default case (or explicit check) that formats and returns an error
indicating an unknown kafka broker-driver override key so typos or unsupported
overrides fail validation.
- Around line 156-170: The normalization function normalizeConnectionConfig
currently trims SASLUsername and SASLPassword which can mutate valid opaque
credentials; stop trimming those fields and only trim and normalize
non-credential fields (e.g., keep cfg.SASLUsername and cfg.SASLPassword exactly
as provided), while still trimming/sanitizing cfg.Brokers and cfg.SASLMechanism
as shown; update the function to remove the strings.TrimSpace calls for
cfg.SASLUsername and cfg.SASLPassword so credentials are preserved.
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go`:
- Around line 50-92: The loop currently sums ListedOffset.Offset into totalEnd
and compares replayed < totalEnd, which is wrong; instead build a per-partition
end map from endOffsets.Each (mapping topic+partition -> endOffset) and track
per-partition progress (e.g., current offset or a completed flag) as you consume
records in the PollFetches loop; for each record use
record.Topic/record.Partition to look up that partition's endOffset and
skip/mark-complete when record.Offset >= endOffset for that partition, and stop
the replay when all partitions are marked complete. Update the code that uses
totalEnd/replayed to use this per-partition end map and completion check (refer
to endOffsets, ListedOffset, recordToMessage, handler, and the PollFetches loop)
so on compacted topics and concurrent appends we stop correctly per partition.
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Around line 75-79: The JSON unmarshal error for Subscription during replay
should abort reconciliation instead of returning nil; in the block where
json.Unmarshal(msg.Value, &sub) is called (and slog.Error logs the failure),
propagate and return the error (or a wrapped error) so the caller/replay loop
can stop rather than silently continue with a malformed record—update the
handler around Subscription/json.Unmarshal to return the error instead of nil.
In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 2689-2691: The orphan-cleanup path currently calls
cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but that helper
deletes subscriptions directly and does not rebuild the xDS snapshot; update the
flow so it triggers refreshSubscriptionSnapshot for the API after cleanup
succeeds: either modify cleanupOrphanedResources to return success/error and
call refreshSubscriptionSnapshot(apiID) on success, or keep the call signature
and invoke c.refreshSubscriptionSnapshot(apiID) immediately after
c.cleanupOrphanedResources(...) completes successfully (ensure errors from
cleanupOrphanedResources are handled and logged); reference functions:
cleanupOrphanedResources(), refreshSubscriptionSnapshot(),
performFullAPIDeletion(), and event type websub.deleted / identifiers apiID and
deletedEvent.CorrelationID.
In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 152-189: The undeploy path updates existing.Configuration and
existing.SourceConfiguration but then calls s.db.UpdateConfig(existing), and
UpdateConfig (storage/sql_store.go) only updates metadata/state so the
configuration payload is not persisted; change the undeploy flow in service.go
to persist the full config payload by either invoking the same
upsert/full-config persistence used by DeployAPIConfiguration (reuse that upsert
function) or extend s.db.UpdateConfig to also write the configuration/spec
fields (ensure it accepts and persists api.WebSubAPI payloads); update the call
site in the method where existing.Configuration/SourceConfiguration are set so
it calls the full-config upsert (or the enhanced UpdateConfig) instead of the
current UpdateConfig to ensure spec changes survive reloads.
---
Outside diff comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`:
- Around line 101-127: EnsureStateTopics only sets cleanup.policy=compact on
creation and treats existing topics as success, so topics already existing
without compaction are not corrected; update createTopics/EnsureStateTopics to,
for each topic that already exists (detected via isTopicAlreadyExistsErr or by
querying the admin), fetch its current config and if cleanup.policy != "compact"
call the admin config-alter API to set cleanup.policy="compact" (use the Kafka
admin client's config/AlterConfigs or equivalent in the admin field) before
returning success; keep existing logging (slog.Debug/slog.Info) and ensure
errors from the config-update path are returned similarly to creation errors.
In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go`:
- Around line 195-212: The handler currently returns 400 for all
non-not-found/conflict errors from webSubAPIService.Update; change the fallback
so service/storage/runtime failures (i.e., any error that is not errors.Is(err,
websubapi.ErrNotFound) and not storage.IsConflictError(err)) return a 5xx (use
http.StatusInternalServerError) with the error message and proper logging;
update the error handling block in websub_api_handler.go (the branch handling
err from webSubAPIService.Update) to map validation/consumer errors to 400 only,
keep ErrNotFound -> 404 and storage.IsConflictError -> 409, and return 500 for
all other service/storage/runtime errors.
---
Nitpick comments:
In `@event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf`:
- Around line 3-6: Replace the hard-coded credential values in the JAAS defaults
(the entries username, password, user_broker, user_egw) with non-sensitive
placeholders or references to a local/untracked override (e.g., template
variables or an env-based lookup) and add/update an untracked template file
(e.g., kafka_server_jaas.conf.template) plus a note in README explaining how to
populate the real credentials locally; ensure the production/CI loader uses the
real secret source (env/secret manager) rather than the committed file and
remove any concrete passwords from the committed kafka_server_jaas.conf.
In `@event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties`:
- Around line 1-18: Add a prominent DEV-ONLY banner as a multi-line properties
comment (using #) at the very top of this properties file (before the existing
process.roles entry) that clearly states this is for local development only,
must not be used in production, and warns that this profile uses
plaintext/SASL_PLAINTEXT (see keys like listeners and sasl.enabled.mechanisms)
so it’s insecure; keep it only as a comment and do not modify any existing
property keys or values.
In `@event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties`:
- Around line 15-18: The file contains hardcoded keystore/truststore passwords
(ssl.keystore.password, ssl.key.password, ssl.truststore.password) which must be
externalized; replace the literal values with environment-driven placeholders
(e.g. use ${ENV_SSL_KEYSTORE_PASSWORD:-} or a config templating reference) and
ensure your application or startup scripts read those env vars or a local
override file, validate presence at startup, and remove/rotate any committed
secrets; update any README or example env file to show the required variables
without embedding real passwords.
In `@event-gateway/docker-compose.dev.yaml`:
- Around line 72-80: Remove duplicated SASL client env vars (APIP_EGW_KAFKA_TLS,
APIP_EGW_KAFKA_SASL_MECHANISM, APIP_EGW_KAFKA_SASL_USERNAME,
APIP_EGW_KAFKA_SASL_PASSWORD) from the individual service blocks (event-gateway
and kafka-ui) and centralize them into a single shared source—either an env_file
(e.g., .env.dev) referenced by both services or a top-level reusable anchor
(x-common-environment) that both event-gateway and kafka-ui include; update the
service definitions to reference the shared variables instead of repeating them
so credential/mechanism changes are made in one place.
In `@event-gateway/gateway-runtime/internal/connectors/types.go`:
- Line 56: The interface BrokerDriver.Replay currently accepts topics []string
but KafkaDriver.Replay only works for a single topic; either change the contract
to accept a single topic (replace BrokerDriver.Replay(ctx context.Context,
topics []string, handler MessageHandler) error with Replay(ctx context.Context,
topic string, handler MessageHandler) error) and update all implementations
(e.g., KafkaDriver.Replay) and callers accordingly, or if you must keep multiple
topics, add a clear doc comment on BrokerDriver.Replay stating the single-topic
constraint and update KafkaDriver.Replay to return a descriptive error when
len(topics) != 1; reference BrokerDriver.Replay and KafkaDriver.Replay when
making the change.
In `@event-gateway/gateway-runtime/internal/subscription/sync_test.go`:
- Around line 142-152: The test fake recordingBrokerDriver.Replay currently
returns immediately on the first handler error; change it to mirror production
Kafka replay semantics by invoking handler for every message in r.replayMessages
and, on error, log the error and continue to the next message instead of
returning; ensure the method still sets r.replayTopic from topics[0] and
ultimately returns nil (or an aggregated/non-fatal result consistent with
production) so tests exercise the same control flow as the runtime code.
In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 107-108: The Update method currently assigns log := params.Logger
and later dereferences it, risking a nil panic because UpdateParams.Logger is
optional; change Update to use params.Logger if non-nil otherwise fall back to
the service-level logger s.logger (i.e., set log to params.Logger when present
else s.logger) so all log calls use a safe logger instance; update references in
Update to use this resolved log variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 23591ab2-39f9-4f83-a2b6-6142572b27a6
📒 Files selected for processing (32)
event-gateway/configs/kafka-dev-config/kafka_server_jaas.confevent-gateway/configs/kafka-dev-config/server-sasl-plaintext.propertiesevent-gateway/configs/kafka-dev-config/server-sasl-ssl.propertiesevent-gateway/docker-compose.dev.yamlevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/binding/loader_test.goevent-gateway/gateway-runtime/internal/binding/types.goevent-gateway/gateway-runtime/internal/config/config.goevent-gateway/gateway-runtime/internal/config/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager_test.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/runtime/runtime.goevent-gateway/gateway-runtime/internal/runtime/runtime_test.goevent-gateway/gateway-runtime/internal/subscription/reconciler.goevent-gateway/gateway-runtime/internal/subscription/sync.goevent-gateway/gateway-runtime/internal/subscription/sync_test.gogateway/gateway-controller/pkg/api/handlers/handlers.gogateway/gateway-controller/pkg/api/handlers/handlers_test.gogateway/gateway-controller/pkg/api/handlers/websub_api_handler.gogateway/gateway-controller/pkg/controlplane/api_deleted_test.gogateway/gateway-controller/pkg/controlplane/client.gogateway/gateway-controller/pkg/service/websubapi/errors.gogateway/gateway-controller/pkg/service/websubapi/service.go
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
gateway/gateway-controller/pkg/controlplane/client.go (1)
1810-1816:⚠️ Potential issue | 🟠 MajorRefresh the subscription snapshot on the missing-config
api.deletedpath.
cleanupOrphanedResources()deletes stale subscriptions, but this branch returns without rebuilding the subscription snapshot. That leaves local subscription xDS stale until some later refresh. Please mirrorperformFullAPIDeletion()here, or move the refresh into the shared helper so both delete paths stay consistent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@gateway/gateway-controller/pkg/controlplane/client.go` around lines 1810 - 1816, The branch handling the missing-config api.deleted path calls cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but returns without rebuilding the subscription snapshot, leaving xDS stale; update this branch to mirror performFullAPIDeletion() by invoking the same subscription snapshot refresh logic after cleanup (or move the refresh into the shared helper used by both paths) so both delete flows rebuild the subscription snapshot consistently; ensure you reference and call the same snapshot-refreshing function used by performFullAPIDeletion (or refactor refresh logic into cleanupOrphanedResources or a new helper) and preserve the existing logging of api_id and correlation_id.
🧹 Nitpick comments (2)
event-gateway/gateway-runtime/internal/subscription/reconciler.go (1)
61-61: Use broker-agnostic reconciliation log wording.Line 61 logs “from Kafka”, but this reconciler now depends on
connectors.BrokerDriver. Using generic wording will keep logs accurate across broker implementations.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go` at line 61, The log message in the reconciler uses broker-specific wording ("Starting subscription reconciliation from Kafka") which is inaccurate now that the reconciler depends on connectors.BrokerDriver; update the slog.Info call in reconciler.go (the call to slog.Info that emits "Starting subscription reconciliation from Kafka") to a broker-agnostic string such as "Starting subscription reconciliation" or include the broker driver name dynamically via connectors.BrokerDriver where available so logs remain correct across broker implementations.event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go (1)
130-131: Explicitly set the TLS minimum version on the config.The zero-value
tls.Config()defaults to TLS 1.2 in current Go versions, but explicitly settingMinVersion(e.g.,tls.VersionTLS12ortls.VersionTLS13) is recommended for outbound connections. This approach clarifies intent and avoids reliance on defaults that may evolve with future Go releases.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go` around lines 130 - 131, When cfg.TLS is true and you append kgo.DialTLSConfig(new(tls.Config)), replace the zero-value tls.Config with an explicit config that sets MinVersion (e.g., tls.VersionTLS12 or tls.VersionTLS13); update the kgo.DialTLSConfig call to use &tls.Config{MinVersion: tls.VersionTLS12} (or VersionTLS13) so the TLS minimum version is explicitly defined rather than relying on the tls.Config zero-value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Around line 64-83: Reconcile currently ignores errors from r.store.Remove and
r.store.Add and calls r.callback regardless; change the logic in reconciler.go
so that after parseSyncKey(...) when msg.Value==nil you capture and check the
error returned by r.store.Remove(parts[0], parts[1]) and return a wrapped error
if it fails (do not call r.callback on failure), and similarly when
unmarshalling succeeds call r.store.Add(&sub), check its error and return a
wrapped error if it fails before invoking r.callback(&sub, false); ensure all
callback invocations only happen after the corresponding store operation
succeeded and include contextual error messages that reference the failing
operation (Remove or Add).
In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 1694-1697: The publish call to
c.eventHub.PublishEvent(c.gatewayID, evt) should not abort the orphan cleanup on
failure; instead log the error and continue so the DB cleanup is considered
successful. Replace the current pattern that logs with c.logger.Error(... ) and
returns err by logging the failure (include gatewayID and err) via
c.logger.Error and then proceed without returning; ensure callers like
websub.deleted still run refreshSubscriptionSnapshot() after local removal. Keep
the event publish attempted but make its failure non-fatal to the cleanup flow.
---
Duplicate comments:
In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 1810-1816: The branch handling the missing-config api.deleted path
calls cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but returns
without rebuilding the subscription snapshot, leaving xDS stale; update this
branch to mirror performFullAPIDeletion() by invoking the same subscription
snapshot refresh logic after cleanup (or move the refresh into the shared helper
used by both paths) so both delete flows rebuild the subscription snapshot
consistently; ensure you reference and call the same snapshot-refreshing
function used by performFullAPIDeletion (or refactor refresh logic into
cleanupOrphanedResources or a new helper) and preserve the existing logging of
api_id and correlation_id.
---
Nitpick comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 130-131: When cfg.TLS is true and you append
kgo.DialTLSConfig(new(tls.Config)), replace the zero-value tls.Config with an
explicit config that sets MinVersion (e.g., tls.VersionTLS12 or
tls.VersionTLS13); update the kgo.DialTLSConfig call to use
&tls.Config{MinVersion: tls.VersionTLS12} (or VersionTLS13) so the TLS minimum
version is explicitly defined rather than relying on the tls.Config zero-value.
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Line 61: The log message in the reconciler uses broker-specific wording
("Starting subscription reconciliation from Kafka") which is inaccurate now that
the reconciler depends on connectors.BrokerDriver; update the slog.Info call in
reconciler.go (the call to slog.Info that emits "Starting subscription
reconciliation from Kafka") to a broker-agnostic string such as "Starting
subscription reconciliation" or include the broker driver name dynamically via
connectors.BrokerDriver where available so logs remain correct across broker
implementations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b2733fde-7b2e-402c-b9ee-98530b907674
📒 Files selected for processing (5)
event-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/subscription/reconciler.gogateway/gateway-controller/pkg/controlplane/api_deleted_test.gogateway/gateway-controller/pkg/controlplane/client.go
🚧 Files skipped from review as they are similar to previous changes (2)
- event-gateway/gateway-runtime/configs/config.toml
- gateway/gateway-controller/pkg/controlplane/api_deleted_test.go
Purpose
This change is needed to make the event-gateway usable with secured Kafka setups and to fix WebSub lifecycle gaps across update, undeploy, and delete flows. It also removes direct Kafka coupling from WebSub
subscription handling so receiver logic can work through the broker-driver abstraction consistently.
Resolves N/A.
Goals
Approach
User stories
Documentation
N/A — this diff is runtime/controller implementation focused. If needed, follow-up docs should cover secure Kafka dev setup and WebSub lifecycle behavior.
Automation tests
Added/updated unit coverage for Kafka config validation and merge behavior, config env/file loading, broker-driver-backed sync/replay, consumer manager behavior, WebSub undeploy path, and orphan deletion
cleanup.
N/A in this diff review. I only verified branch changes from the git diff; no integration suite results were attached here.
Security checks
Local Kafka dev auth/sample credentials are included in the added dev config files.
Samples
Related PRs
N/A
Test environment