Skip to content

Add secure Kafka support and WebSub lifecycle fixes (plus topic normalization) for event-gateway#1822

Open
AnujaKalahara99 wants to merge 10 commits intowso2:mainfrom
AnujaKalahara99:egw/merge/fix
Open

Add secure Kafka support and WebSub lifecycle fixes (plus topic normalization) for event-gateway#1822
AnujaKalahara99 wants to merge 10 commits intowso2:mainfrom
AnujaKalahara99:egw/merge/fix

Conversation

@AnujaKalahara99
Copy link
Copy Markdown
Contributor

Purpose

This change is needed to make the event-gateway usable with secured Kafka setups and to fix WebSub lifecycle gaps across update, undeploy, and delete flows. It also removes direct Kafka coupling from WebSub
subscription handling so receiver logic can work through the broker-driver abstraction consistently.

Resolves N/A.

Goals

  • Add Kafka TLS/SASL support with global config plus per-binding overrides.
  • Move WebSub subscription sync/replay and callback consumer management behind the broker-driver interface.
  • Normalize broker-facing topic names safely.
  • Fix WebSub undeploy/delete/update handling so gateway-controller and control-plane cleanup are consistent.

Approach

  • Added typed Kafka connection config resolution/validation and wired it through the event-gateway Kafka broker-driver factory.
  • Reworked WebSub subscription sync and reconciliation to use broker-driver Publish, Replay, and EnsureStateTopics instead of direct Kafka clients.
  • Updated WebSub consumer management to subscribe through the broker-driver abstraction.
  • Normalized broker topic segments for WebSub channel and subscription topics.
  • Added a dedicated websubapi service in gateway-controller for update/undeploy handling and improved orphan cleanup on WebSub delete events.

User stories

  • As an operator, I can run event-gateway against secured Kafka using TLS/SASL.
  • As a developer, I can use WebSub receivers without hard Kafka dependencies leaking into receiver/subscription code.
  • As a platform user, WebSub undeploy/delete flows clean up correctly and do not leave stale state behind.

Documentation

N/A — this diff is runtime/controller implementation focused. If needed, follow-up docs should cover secure Kafka dev setup and WebSub lifecycle behavior.

Automation tests

  • Unit tests
    Added/updated unit coverage for Kafka config validation and merge behavior, config env/file loading, broker-driver-backed sync/replay, consumer manager behavior, WebSub undeploy path, and orphan deletion
    cleanup.
  • Integration tests
    N/A in this diff review. I only verified branch changes from the git diff; no integration suite results were attached here.

Security checks

  • Followed secure coding standards in http://wso2.com/technical-reports/wso2-secure-engineering-guidelines? yes
  • Ran FindSecurityBugs plugin and verified report? no
  • Confirmed that this PR doesn't commit any keys, passwords, tokens, usernames, or other secrets? yes
    Local Kafka dev auth/sample credentials are included in the added dev config files.

Samples

  • Added local Kafka dev configuration for SASL_PLAINTEXT and SASL_SSL testing under event-gateway/configs/kafka-dev-config/.
  • Updated event-gateway/docker-compose.dev.yaml to support the secured Kafka dev setup.

Related PRs

N/A

Test environment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

📝 Walkthrough

Summary

This PR implements secure Kafka support, establishes a broker-driver abstraction layer to decouple WebSub from direct Kafka dependencies, introduces WebSub topic normalization, and fixes WebSub lifecycle management in the API gateway.

Kafka Security & Configuration

  • Added typed Kafka connection configuration (ConnectionConfig) supporting TLS and SASL authentication mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
  • Implemented configuration resolution (ResolveConnectionConfig) that merges global Kafka settings with per-binding overrides
  • Added validation (ValidateConnectionConfig) enforcing broker presence and SASL credential consistency
  • Extended runtime configuration to load and validate TLS and SASL settings from environment variables and config files
  • Updated Kafka client initialization to build franz-go options with TLS/SASL configuration via BuildClientOptions

Broker-Driver Abstraction

  • Refactored WebSub subscription synchronization, reconciliation, and replay to use the BrokerDriver interface instead of direct Kafka clients
  • Replaced Kafka imports and manual client construction in subscription sync/replay logic with broker-driver method calls (Replay, Publish, EnsureStateTopics)
  • Updated ConsumerManager to create subscriptions through brokerDriver.Subscribe() instead of directly instantiating Kafka consumers
  • Modified Replayer from a stateful background polling mechanism to a bounded ReplayTopic function delegating to the broker driver
  • Added SyncProducer and Reconciler to accept BrokerDriver for message publishing and topic replay
  • Expanded BrokerDriver interface with Replay and EnsureStateTopics methods to support topic state management

Topic Normalization

  • Introduced NormalizeTopicSegment function to normalize topic segment names by trimming whitespace, preserving alphanumerics and dot/dash characters, escaping underscores and other characters into hex-encoded forms
  • Updated WebSubApiTopicName and WebSubApiSubscriptionTopic to normalize all segments before concatenation
  • Applied normalization in runtime topic qualification to handle special characters in context/version/topic identifiers

WebSub Gateway Handler Refactoring

  • Removed Brokers field from WebSub receiver Options structure
  • Decoupled WebSub receiver from direct Kafka broker configuration, routing all topic operations through the configured BrokerDriver
  • Simplified producer creation logic by always instantiating sync producer when internal subscription topic is present
  • Updated subscription reconciliation control flow to depend on broker driver availability

WebSub API Service Layer

  • Created dedicated WebSubAPIService in gateway-controller to handle WebSub API updates with proper state management
  • Implemented Update method that handles deployment state transitions (deploy/undeploy) with validation, conflict checking, and event publishing
  • Added error types for structured error handling: ValidationError, ParseError, HandleMismatchError, and ErrNotFound sentinel
  • Integrated WebSub API service into the API handler, delegating update logic from the HTTP handler to the service layer

WebSub Lifecycle & Orphan Cleanup

  • Enhanced cleanupOrphanedResources to return errors and propagate failures from API key and subscription cleanup operations
  • Implemented dedicated WebSub deletion event handling with orphan cleanup and subscription snapshot refresh
  • Added error logging and early exit on cleanup failures in deletion paths
  • Updated undeploy flow to properly manage gateway-controller and control-plane cleanup

Testing Additions

  • Added unit tests for Kafka configuration validation, resolution, and option building covering TLS, SASL mechanisms, and credential handling
  • Added environment variable and file-based configuration loading tests for Kafka security settings
  • Introduced table-driven tests for topic name normalization validating special character escaping
  • Added consumer manager tests verifying broker subscription calls and group ID generation
  • Created subscription sync/reconciliation tests validating sync topic creation, publish behavior, replay message handling, and callback invocation for active/inactive/tombstone subscriptions
  • Added WebSub API undeploy path test validating deployment state transitions and event publication
  • Added API deletion tests covering orphan cleanup and subscription snapshot refresh

Walkthrough

This pull request introduces a unified Kafka configuration system with TLS and SASL support, refactors topic name handling to normalize special characters, and abstracts broker-specific dependencies behind a BrokerDriver interface throughout the event gateway. The changes consolidate Kafka connection initialization through a ConnectionConfig struct, update WebSub topic derivation to escape unsupported characters using NormalizeTopicSegment, simplify the subscription reconciliation flow by delegating replay operations to the broker driver, and introduce a new WebSubAPIService to manage WebSub API state transitions. Configuration validation is added at startup, the stateful Replayer is replaced with a functional ReplayTopic approach, and WebSub connectors are decoupled from direct broker management through interface-based dependencies.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 48.21% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the three main objectives: secure Kafka support via TLS/SASL, WebSub lifecycle fixes, and topic normalization. It is concise and specific enough to convey the primary changes.
Description check ✅ Passed The description covers Purpose, Goals, Approach, User stories, Documentation, Automation tests, Security checks, Samples, and Test environment sections. All major sections are present with substantive content, though Test environment section is incomplete.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.11.4)

level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain modules listed in go.work or their selected dependencies"


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go (1)

101-127: ⚠️ Potential issue | 🟠 Major

EnsureStateTopics does not correct existing topic configs.

Lines 102-106 only pass cleanup.policy=compact during topic creation, and Lines 117-120 treat TOPIC_ALREADY_EXISTS as success. If a state topic already exists without compaction, this method leaves it unchanged, so it does not actually ensure state-topic semantics after an upgrade.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`
around lines 101 - 127, EnsureStateTopics only sets cleanup.policy=compact on
creation and treats existing topics as success, so topics already existing
without compaction are not corrected; update createTopics/EnsureStateTopics to,
for each topic that already exists (detected via isTopicAlreadyExistsErr or by
querying the admin), fetch its current config and if cleanup.policy != "compact"
call the admin config-alter API to set cleanup.policy="compact" (use the Kafka
admin client's config/AlterConfigs or equivalent in the admin field) before
returning success; keep existing logging (slog.Debug/slog.Info) and ensure
errors from the config-update path are returned similarly to creation errors.
gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go (1)

195-212: ⚠️ Potential issue | 🟠 Major

Do not collapse service/storage failures into the generic 400 response.

After moving the update flow behind webSubAPIService.Update, server-side failures from gateway/gateway-controller/pkg/service/websubapi/service.go:122-128 now fall through to this branch and are returned as 400 Bad Request. Please reserve 400 for request/validation errors and map storage/runtime failures to a 5xx response instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go` around
lines 195 - 212, The handler currently returns 400 for all
non-not-found/conflict errors from webSubAPIService.Update; change the fallback
so service/storage/runtime failures (i.e., any error that is not errors.Is(err,
websubapi.ErrNotFound) and not storage.IsConflictError(err)) return a 5xx (use
http.StatusInternalServerError) with the error message and proper logging;
update the error handling block in websub_api_handler.go (the branch handling
err from webSubAPIService.Update) to map validation/consumer errors to 400 only,
keep ErrNotFound -> 404 and storage.IsConflictError -> 409, and return 500 for
all other service/storage/runtime errors.
🧹 Nitpick comments (7)
event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf (1)

3-6: Avoid committing concrete credential values in shared JAAS defaults.

Line 3-Line 6 hard-code broker/user passwords. Prefer a local override mechanism (or untracked template) so repository defaults don’t carry reusable credentials.

As per coding guidelines: “validate safety without exposing sensitive context.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf` around lines 3
- 6, Replace the hard-coded credential values in the JAAS defaults (the entries
username, password, user_broker, user_egw) with non-sensitive placeholders or
references to a local/untracked override (e.g., template variables or an
env-based lookup) and add/update an untracked template file (e.g.,
kafka_server_jaas.conf.template) plus a note in README explaining how to
populate the real credentials locally; ensure the production/CI loader uses the
real secret source (env/secret manager) rather than the committed file and
remove any concrete passwords from the committed kafka_server_jaas.conf.
event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties (1)

15-18: Externalize keystore/truststore password values from committed config defaults.

Even for dev configs, avoid fixed password literals in tracked files; use environment-driven or local override values.

As per coding guidelines: “validate safety without exposing sensitive context.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties` around
lines 15 - 18, The file contains hardcoded keystore/truststore passwords
(ssl.keystore.password, ssl.key.password, ssl.truststore.password) which must be
externalized; replace the literal values with environment-driven placeholders
(e.g. use ${ENV_SSL_KEYSTORE_PASSWORD:-} or a config templating reference) and
ensure your application or startup scripts read those env vars or a local
override file, validate presence at startup, and remove/rotate any committed
secrets; update any README or example env file to show the required variables
without embedding real passwords.
event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties (1)

1-18: Add an explicit DEV-ONLY header comment in this plaintext profile.

A short banner at the top helps prevent accidental reuse of this profile outside local development.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties`
around lines 1 - 18, Add a prominent DEV-ONLY banner as a multi-line properties
comment (using #) at the very top of this properties file (before the existing
process.roles entry) that clearly states this is for local development only,
must not be used in production, and warns that this profile uses
plaintext/SASL_PLAINTEXT (see keys like listeners and sasl.enabled.mechanisms)
so it’s insecure; keep it only as a comment and do not modify any existing
property keys or values.
event-gateway/gateway-runtime/internal/subscription/sync_test.go (1)

142-152: Align the replay fake with production behavior.

This test double returns on the first handler error, while the Kafka replay path logs handler errors and continues. Keeping those semantics aligned will make future reconciler tests exercise the same control flow as runtime code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/gateway-runtime/internal/subscription/sync_test.go` around
lines 142 - 152, The test fake recordingBrokerDriver.Replay currently returns
immediately on the first handler error; change it to mirror production Kafka
replay semantics by invoking handler for every message in r.replayMessages and,
on error, log the error and continue to the next message instead of returning;
ensure the method still sets r.replayTopic from topics[0] and ultimately returns
nil (or an aggregated/non-fatal result consistent with production) so tests
exercise the same control flow as the runtime code.
event-gateway/gateway-runtime/internal/connectors/types.go (1)

56-56: Narrow or document the Replay contract.

BrokerDriver.Replay accepts multiple topics, but the Kafka implementation currently errors unless exactly one topic is passed. That mismatch makes the abstraction easy to misuse. Consider changing this to topic string or documenting the single-topic constraint on the interface.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/gateway-runtime/internal/connectors/types.go` at line 56, The
interface BrokerDriver.Replay currently accepts topics []string but
KafkaDriver.Replay only works for a single topic; either change the contract to
accept a single topic (replace BrokerDriver.Replay(ctx context.Context, topics
[]string, handler MessageHandler) error with Replay(ctx context.Context, topic
string, handler MessageHandler) error) and update all implementations (e.g.,
KafkaDriver.Replay) and callers accordingly, or if you must keep multiple
topics, add a clear doc comment on BrokerDriver.Replay stating the single-topic
constraint and update KafkaDriver.Replay to return a descriptive error when
len(topics) != 1; reference BrokerDriver.Replay and KafkaDriver.Replay when
making the change.
event-gateway/docker-compose.dev.yaml (1)

72-80: Avoid duplicating the SASL client settings across services.

These lines repeat the same mechanism and credentials in both event-gateway and kafka-ui. Moving them to shared Compose variables or an env file will reduce drift when the dev auth settings change.

Also applies to: 149-155

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/docker-compose.dev.yaml` around lines 72 - 80, Remove
duplicated SASL client env vars (APIP_EGW_KAFKA_TLS,
APIP_EGW_KAFKA_SASL_MECHANISM, APIP_EGW_KAFKA_SASL_USERNAME,
APIP_EGW_KAFKA_SASL_PASSWORD) from the individual service blocks (event-gateway
and kafka-ui) and centralize them into a single shared source—either an env_file
(e.g., .env.dev) referenced by both services or a top-level reusable anchor
(x-common-environment) that both event-gateway and kafka-ui include; update the
service definitions to reference the shared variables instead of repeating them
so credential/mechanism changes are made in one place.
gateway/gateway-controller/pkg/service/websubapi/service.go (1)

107-108: Use the service logger as a fallback.

Update dereferences params.Logger later, but UpdateParams does not enforce it and the constructor already stores s.logger. Falling back avoids a nil panic and makes the injected logger useful.

Suggested change
 func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) {
-	log := params.Logger
+	log := params.Logger
+	if log == nil {
+		log = s.logger
+	}
+	if log == nil {
+		log = slog.Default()
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@gateway/gateway-controller/pkg/service/websubapi/service.go` around lines 107
- 108, The Update method currently assigns log := params.Logger and later
dereferences it, risking a nil panic because UpdateParams.Logger is optional;
change Update to use params.Logger if non-nil otherwise fall back to the
service-level logger s.logger (i.e., set log to params.Logger when present else
s.logger) so all log calls use a safe logger instance; update references in
Update to use this resolved log variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@event-gateway/gateway-runtime/configs/config.toml`:
- Around line 31-33: Replace the hard-coded SASL defaults by removing the
concrete values for sasl_mechanism, sasl_username and sasl_password in the
runtime config and leave them blank (e.g., empty strings or omitted) so
credentials must be injected at deployment time; also ensure any code that reads
these keys (the config loader/initializer that references sasl_mechanism,
sasl_username, sasl_password) treats empty values as “not configured” and falls
back to secure injection (environment vars or secret manager) and validates
presence at startup.

In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 52-86: The loop over overrides currently only handles known keys
and silently ignores unknown ones; update the code in the overrides handling
(the loop that references overrides, parseBrokerList, and sets fields on cfg /
ConnectionConfig like Brokers, TLS, SASLMechanism, SASLUsername, SASLPassword)
to return an error for any unrecognized key instead of ignoring it—after the
switch, add a default case (or explicit check) that formats and returns an error
indicating an unknown kafka broker-driver override key so typos or unsupported
overrides fail validation.
- Around line 156-170: The normalization function normalizeConnectionConfig
currently trims SASLUsername and SASLPassword which can mutate valid opaque
credentials; stop trimming those fields and only trim and normalize
non-credential fields (e.g., keep cfg.SASLUsername and cfg.SASLPassword exactly
as provided), while still trimming/sanitizing cfg.Brokers and cfg.SASLMechanism
as shown; update the function to remove the strings.TrimSpace calls for
cfg.SASLUsername and cfg.SASLPassword so credentials are preserved.

In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go`:
- Around line 50-92: The loop currently sums ListedOffset.Offset into totalEnd
and compares replayed < totalEnd, which is wrong; instead build a per-partition
end map from endOffsets.Each (mapping topic+partition -> endOffset) and track
per-partition progress (e.g., current offset or a completed flag) as you consume
records in the PollFetches loop; for each record use
record.Topic/record.Partition to look up that partition's endOffset and
skip/mark-complete when record.Offset >= endOffset for that partition, and stop
the replay when all partitions are marked complete. Update the code that uses
totalEnd/replayed to use this per-partition end map and completion check (refer
to endOffsets, ListedOffset, recordToMessage, handler, and the PollFetches loop)
so on compacted topics and concurrent appends we stop correctly per partition.

In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Around line 75-79: The JSON unmarshal error for Subscription during replay
should abort reconciliation instead of returning nil; in the block where
json.Unmarshal(msg.Value, &sub) is called (and slog.Error logs the failure),
propagate and return the error (or a wrapped error) so the caller/replay loop
can stop rather than silently continue with a malformed record—update the
handler around Subscription/json.Unmarshal to return the error instead of nil.

In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 2689-2691: The orphan-cleanup path currently calls
cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but that helper
deletes subscriptions directly and does not rebuild the xDS snapshot; update the
flow so it triggers refreshSubscriptionSnapshot for the API after cleanup
succeeds: either modify cleanupOrphanedResources to return success/error and
call refreshSubscriptionSnapshot(apiID) on success, or keep the call signature
and invoke c.refreshSubscriptionSnapshot(apiID) immediately after
c.cleanupOrphanedResources(...) completes successfully (ensure errors from
cleanupOrphanedResources are handled and logged); reference functions:
cleanupOrphanedResources(), refreshSubscriptionSnapshot(),
performFullAPIDeletion(), and event type websub.deleted / identifiers apiID and
deletedEvent.CorrelationID.

In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 152-189: The undeploy path updates existing.Configuration and
existing.SourceConfiguration but then calls s.db.UpdateConfig(existing), and
UpdateConfig (storage/sql_store.go) only updates metadata/state so the
configuration payload is not persisted; change the undeploy flow in service.go
to persist the full config payload by either invoking the same
upsert/full-config persistence used by DeployAPIConfiguration (reuse that upsert
function) or extend s.db.UpdateConfig to also write the configuration/spec
fields (ensure it accepts and persists api.WebSubAPI payloads); update the call
site in the method where existing.Configuration/SourceConfiguration are set so
it calls the full-config upsert (or the enhanced UpdateConfig) instead of the
current UpdateConfig to ensure spec changes survive reloads.

---

Outside diff comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go`:
- Around line 101-127: EnsureStateTopics only sets cleanup.policy=compact on
creation and treats existing topics as success, so topics already existing
without compaction are not corrected; update createTopics/EnsureStateTopics to,
for each topic that already exists (detected via isTopicAlreadyExistsErr or by
querying the admin), fetch its current config and if cleanup.policy != "compact"
call the admin config-alter API to set cleanup.policy="compact" (use the Kafka
admin client's config/AlterConfigs or equivalent in the admin field) before
returning success; keep existing logging (slog.Debug/slog.Info) and ensure
errors from the config-update path are returned similarly to creation errors.

In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go`:
- Around line 195-212: The handler currently returns 400 for all
non-not-found/conflict errors from webSubAPIService.Update; change the fallback
so service/storage/runtime failures (i.e., any error that is not errors.Is(err,
websubapi.ErrNotFound) and not storage.IsConflictError(err)) return a 5xx (use
http.StatusInternalServerError) with the error message and proper logging;
update the error handling block in websub_api_handler.go (the branch handling
err from webSubAPIService.Update) to map validation/consumer errors to 400 only,
keep ErrNotFound -> 404 and storage.IsConflictError -> 409, and return 500 for
all other service/storage/runtime errors.

---

Nitpick comments:
In `@event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf`:
- Around line 3-6: Replace the hard-coded credential values in the JAAS defaults
(the entries username, password, user_broker, user_egw) with non-sensitive
placeholders or references to a local/untracked override (e.g., template
variables or an env-based lookup) and add/update an untracked template file
(e.g., kafka_server_jaas.conf.template) plus a note in README explaining how to
populate the real credentials locally; ensure the production/CI loader uses the
real secret source (env/secret manager) rather than the committed file and
remove any concrete passwords from the committed kafka_server_jaas.conf.

In `@event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties`:
- Around line 1-18: Add a prominent DEV-ONLY banner as a multi-line properties
comment (using #) at the very top of this properties file (before the existing
process.roles entry) that clearly states this is for local development only,
must not be used in production, and warns that this profile uses
plaintext/SASL_PLAINTEXT (see keys like listeners and sasl.enabled.mechanisms)
so it’s insecure; keep it only as a comment and do not modify any existing
property keys or values.

In `@event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties`:
- Around line 15-18: The file contains hardcoded keystore/truststore passwords
(ssl.keystore.password, ssl.key.password, ssl.truststore.password) which must be
externalized; replace the literal values with environment-driven placeholders
(e.g. use ${ENV_SSL_KEYSTORE_PASSWORD:-} or a config templating reference) and
ensure your application or startup scripts read those env vars or a local
override file, validate presence at startup, and remove/rotate any committed
secrets; update any README or example env file to show the required variables
without embedding real passwords.

In `@event-gateway/docker-compose.dev.yaml`:
- Around line 72-80: Remove duplicated SASL client env vars (APIP_EGW_KAFKA_TLS,
APIP_EGW_KAFKA_SASL_MECHANISM, APIP_EGW_KAFKA_SASL_USERNAME,
APIP_EGW_KAFKA_SASL_PASSWORD) from the individual service blocks (event-gateway
and kafka-ui) and centralize them into a single shared source—either an env_file
(e.g., .env.dev) referenced by both services or a top-level reusable anchor
(x-common-environment) that both event-gateway and kafka-ui include; update the
service definitions to reference the shared variables instead of repeating them
so credential/mechanism changes are made in one place.

In `@event-gateway/gateway-runtime/internal/connectors/types.go`:
- Line 56: The interface BrokerDriver.Replay currently accepts topics []string
but KafkaDriver.Replay only works for a single topic; either change the contract
to accept a single topic (replace BrokerDriver.Replay(ctx context.Context,
topics []string, handler MessageHandler) error with Replay(ctx context.Context,
topic string, handler MessageHandler) error) and update all implementations
(e.g., KafkaDriver.Replay) and callers accordingly, or if you must keep multiple
topics, add a clear doc comment on BrokerDriver.Replay stating the single-topic
constraint and update KafkaDriver.Replay to return a descriptive error when
len(topics) != 1; reference BrokerDriver.Replay and KafkaDriver.Replay when
making the change.

In `@event-gateway/gateway-runtime/internal/subscription/sync_test.go`:
- Around line 142-152: The test fake recordingBrokerDriver.Replay currently
returns immediately on the first handler error; change it to mirror production
Kafka replay semantics by invoking handler for every message in r.replayMessages
and, on error, log the error and continue to the next message instead of
returning; ensure the method still sets r.replayTopic from topics[0] and
ultimately returns nil (or an aggregated/non-fatal result consistent with
production) so tests exercise the same control flow as the runtime code.

In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 107-108: The Update method currently assigns log := params.Logger
and later dereferences it, risking a nil panic because UpdateParams.Logger is
optional; change Update to use params.Logger if non-nil otherwise fall back to
the service-level logger s.logger (i.e., set log to params.Logger when present
else s.logger) so all log calls use a safe logger instance; update references in
Update to use this resolved log variable.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 23591ab2-39f9-4f83-a2b6-6142572b27a6

📥 Commits

Reviewing files that changed from the base of the PR and between 49bd727 and 4ef8303.

📒 Files selected for processing (32)
  • event-gateway/configs/kafka-dev-config/kafka_server_jaas.conf
  • event-gateway/configs/kafka-dev-config/server-sasl-plaintext.properties
  • event-gateway/configs/kafka-dev-config/server-sasl-ssl.properties
  • event-gateway/docker-compose.dev.yaml
  • event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/internal/binding/loader_test.go
  • event-gateway/gateway-runtime/internal/binding/types.go
  • event-gateway/gateway-runtime/internal/config/config.go
  • event-gateway/gateway-runtime/internal/config/config_test.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager_test.go
  • event-gateway/gateway-runtime/internal/connectors/types.go
  • event-gateway/gateway-runtime/internal/runtime/runtime.go
  • event-gateway/gateway-runtime/internal/runtime/runtime_test.go
  • event-gateway/gateway-runtime/internal/subscription/reconciler.go
  • event-gateway/gateway-runtime/internal/subscription/sync.go
  • event-gateway/gateway-runtime/internal/subscription/sync_test.go
  • gateway/gateway-controller/pkg/api/handlers/handlers.go
  • gateway/gateway-controller/pkg/api/handlers/handlers_test.go
  • gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go
  • gateway/gateway-controller/pkg/controlplane/api_deleted_test.go
  • gateway/gateway-controller/pkg/controlplane/client.go
  • gateway/gateway-controller/pkg/service/websubapi/errors.go
  • gateway/gateway-controller/pkg/service/websubapi/service.go

Comment thread event-gateway/gateway-runtime/configs/config.toml Outdated
Comment thread event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go Outdated
Comment thread event-gateway/gateway-runtime/internal/subscription/reconciler.go
Comment thread gateway/gateway-controller/pkg/controlplane/client.go Outdated
Comment thread gateway/gateway-controller/pkg/service/websubapi/service.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
gateway/gateway-controller/pkg/controlplane/client.go (1)

1810-1816: ⚠️ Potential issue | 🟠 Major

Refresh the subscription snapshot on the missing-config api.deleted path.

cleanupOrphanedResources() deletes stale subscriptions, but this branch returns without rebuilding the subscription snapshot. That leaves local subscription xDS stale until some later refresh. Please mirror performFullAPIDeletion() here, or move the refresh into the shared helper so both delete paths stay consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@gateway/gateway-controller/pkg/controlplane/client.go` around lines 1810 -
1816, The branch handling the missing-config api.deleted path calls
cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but returns without
rebuilding the subscription snapshot, leaving xDS stale; update this branch to
mirror performFullAPIDeletion() by invoking the same subscription snapshot
refresh logic after cleanup (or move the refresh into the shared helper used by
both paths) so both delete flows rebuild the subscription snapshot consistently;
ensure you reference and call the same snapshot-refreshing function used by
performFullAPIDeletion (or refactor refresh logic into cleanupOrphanedResources
or a new helper) and preserve the existing logging of api_id and correlation_id.
🧹 Nitpick comments (2)
event-gateway/gateway-runtime/internal/subscription/reconciler.go (1)

61-61: Use broker-agnostic reconciliation log wording.

Line 61 logs “from Kafka”, but this reconciler now depends on connectors.BrokerDriver. Using generic wording will keep logs accurate across broker implementations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go` at line
61, The log message in the reconciler uses broker-specific wording ("Starting
subscription reconciliation from Kafka") which is inaccurate now that the
reconciler depends on connectors.BrokerDriver; update the slog.Info call in
reconciler.go (the call to slog.Info that emits "Starting subscription
reconciliation from Kafka") to a broker-agnostic string such as "Starting
subscription reconciliation" or include the broker driver name dynamically via
connectors.BrokerDriver where available so logs remain correct across broker
implementations.
event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go (1)

130-131: Explicitly set the TLS minimum version on the config.

The zero-value tls.Config() defaults to TLS 1.2 in current Go versions, but explicitly setting MinVersion (e.g., tls.VersionTLS12 or tls.VersionTLS13) is recommended for outbound connections. This approach clarifies intent and avoids reliance on defaults that may evolve with future Go releases.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`
around lines 130 - 131, When cfg.TLS is true and you append
kgo.DialTLSConfig(new(tls.Config)), replace the zero-value tls.Config with an
explicit config that sets MinVersion (e.g., tls.VersionTLS12 or
tls.VersionTLS13); update the kgo.DialTLSConfig call to use
&tls.Config{MinVersion: tls.VersionTLS12} (or VersionTLS13) so the TLS minimum
version is explicitly defined rather than relying on the tls.Config zero-value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Around line 64-83: Reconcile currently ignores errors from r.store.Remove and
r.store.Add and calls r.callback regardless; change the logic in reconciler.go
so that after parseSyncKey(...) when msg.Value==nil you capture and check the
error returned by r.store.Remove(parts[0], parts[1]) and return a wrapped error
if it fails (do not call r.callback on failure), and similarly when
unmarshalling succeeds call r.store.Add(&sub), check its error and return a
wrapped error if it fails before invoking r.callback(&sub, false); ensure all
callback invocations only happen after the corresponding store operation
succeeded and include contextual error messages that reference the failing
operation (Remove or Add).

In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 1694-1697: The publish call to
c.eventHub.PublishEvent(c.gatewayID, evt) should not abort the orphan cleanup on
failure; instead log the error and continue so the DB cleanup is considered
successful. Replace the current pattern that logs with c.logger.Error(... ) and
returns err by logging the failure (include gatewayID and err) via
c.logger.Error and then proceed without returning; ensure callers like
websub.deleted still run refreshSubscriptionSnapshot() after local removal. Keep
the event publish attempted but make its failure non-fatal to the cleanup flow.

---

Duplicate comments:
In `@gateway/gateway-controller/pkg/controlplane/client.go`:
- Around line 1810-1816: The branch handling the missing-config api.deleted path
calls cleanupOrphanedResources(apiID, deletedEvent.CorrelationID) but returns
without rebuilding the subscription snapshot, leaving xDS stale; update this
branch to mirror performFullAPIDeletion() by invoking the same subscription
snapshot refresh logic after cleanup (or move the refresh into the shared helper
used by both paths) so both delete flows rebuild the subscription snapshot
consistently; ensure you reference and call the same snapshot-refreshing
function used by performFullAPIDeletion (or refactor refresh logic into
cleanupOrphanedResources or a new helper) and preserve the existing logging of
api_id and correlation_id.

---

Nitpick comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go`:
- Around line 130-131: When cfg.TLS is true and you append
kgo.DialTLSConfig(new(tls.Config)), replace the zero-value tls.Config with an
explicit config that sets MinVersion (e.g., tls.VersionTLS12 or
tls.VersionTLS13); update the kgo.DialTLSConfig call to use
&tls.Config{MinVersion: tls.VersionTLS12} (or VersionTLS13) so the TLS minimum
version is explicitly defined rather than relying on the tls.Config zero-value.

In `@event-gateway/gateway-runtime/internal/subscription/reconciler.go`:
- Line 61: The log message in the reconciler uses broker-specific wording
("Starting subscription reconciliation from Kafka") which is inaccurate now that
the reconciler depends on connectors.BrokerDriver; update the slog.Info call in
reconciler.go (the call to slog.Info that emits "Starting subscription
reconciliation from Kafka") to a broker-agnostic string such as "Starting
subscription reconciliation" or include the broker driver name dynamically via
connectors.BrokerDriver where available so logs remain correct across broker
implementations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b2733fde-7b2e-402c-b9ee-98530b907674

📥 Commits

Reviewing files that changed from the base of the PR and between 4ef8303 and 84954a6.

📒 Files selected for processing (5)
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go
  • event-gateway/gateway-runtime/internal/subscription/reconciler.go
  • gateway/gateway-controller/pkg/controlplane/api_deleted_test.go
  • gateway/gateway-controller/pkg/controlplane/client.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • event-gateway/gateway-runtime/configs/config.toml
  • gateway/gateway-controller/pkg/controlplane/api_deleted_test.go

Comment thread event-gateway/gateway-runtime/internal/subscription/reconciler.go
Comment thread gateway/gateway-controller/pkg/controlplane/client.go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant