Skip to content

Conversation

@bitcoin-coder-bob
Copy link

@bitcoin-coder-bob bitcoin-coder-bob commented Nov 23, 2025

Issue: #728

Adds new rpc UpdateStreamTopics allowing client to update topics on their rpc stream from GetEventStream. Offers ability to add or remove topics at the same time, or to overwrite all topics at once by specifying a list of topics.

A new event type StreamStartedEvent was added that gets sent immediately on the creation of a stream from GetEventStream and returns the id of the listener. This id is then used by the client when calling UpdateStreamTopics.

Changes to the go-sdk repo in the directory /go-sdk/blob/master/client will need to be made to support calling this new rpc. I do not see an appropriate place in this arkd repo to call this new rpc without it also being supported in the go-sdk repo. It appears the e2e_test.go would be the appropriate place to add the test for this new call once supported in the go-sdk.

Summary by CodeRabbit

  • New Features

    • Add API to update event stream topics dynamically (add/remove/overwrite).
    • Event streams now emit an immediate startup notification with the stream id when a listener connects.
  • Bug Fixes

    • Corrected README typo ("liqudiity provider" → "liquidity provider").
  • Tests

    • Added tests for topic overwrite, empty-overwrite handling, and errors for non-existent subscriptions.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 23, 2025

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (2)
  • api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go is excluded by !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds StreamStartedEvent emission immediately when an event stream is created, a new UpdateStreamTopics RPC (modify/overwrite semantics) to change stream topics without restarting streams, per-listener locking and accessor methods in the broker, OpenAPI/Protobuf updates, gRPC handler changes and tests, plus a README typo fix.

Changes

Cohort / File(s) Summary
Documentation
README.md
Fixed typographical error: "liqudiity provider" → "liquidity provider" (single occurrence)
OpenAPI specs
api-spec/openapi/swagger/ark/v1/service.openapi.json, api-spec/openapi/swagger/ark/v1/types.openapi.json
Added POST /v1/batch/updateTopics (ArkService_UpdateStreamTopics) and request/response schemas; added StreamStartedEvent schema; extended GetEventStreamResponse with streamStarted
Protobuf specs
api-spec/protobuf/ark/v1/service.proto, api-spec/protobuf/ark/v1/types.proto
Added rpc UpdateStreamTopics(UpdateStreamTopicsRequest) returns (UpdateStreamTopicsResponse) and messages ModifyTopics, OverwriteTopics, UpdateStreamTopicsRequest, UpdateStreamTopicsResponse; added StreamStartedEvent message; added stream_started variant to GetEventStreamResponse
gRPC handlers
internal/interface/grpc/handlers/arkservice.go
Emit StreamStartedEvent immediately on listener creation in GetEventStream; added UpdateStreamTopics handler with validation, overwrite vs modify branching, and composed response
Broker topic management
internal/interface/grpc/handlers/broker.go
Introduced per-listener RWMutex and thread-safe methods (addTopics, removeTopics, overwriteTopics, getTopics, channel); adapted broker functions to use these accessors and added broker.overwriteTopics helper
Broker tests
internal/interface/grpc/handlers/broker_test.go
Added overwriteTopics tests covering replacement, empty overwrite, and non-existent subscription error case
Permissions
internal/interface/grpc/permissions/permissions.go
Added whitelist entry for UpdateStreamTopics RPC with Ark entity write permission

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Handler as UpdateStreamTopics Handler
    participant Broker
    participant Listener

    Client->>Handler: UpdateStreamTopics(stream_id, {modify|overwrite})
    activate Handler
    Handler->>Handler: Validate stream_id & payload

    alt overwrite provided
        Handler->>Broker: overwriteTopics(stream_id, topics)
        activate Broker
        Broker->>Listener: OverwriteTopics (listener lock)
        Broker-->>Handler: Success + all_topics
        deactivate Broker
    else modify (add/remove)
        Handler->>Broker: addTopics(stream_id, add)
        Handler->>Broker: removeTopics(stream_id, remove)
        Broker-->>Handler: topics_added, topics_removed
    end

    Handler->>Broker: getTopics(stream_id)
    Handler-->>Client: UpdateStreamTopicsResponse(topics_added, topics_removed, all_topics)
    deactivate Handler
Loading
sequenceDiagram
    participant Client
    participant Handler as GetEventStream Handler
    participant Broker
    participant Listener

    Client->>Handler: GetEventStream(request)
    activate Handler
    Handler->>Broker: create/listen -> Listener
    Broker-->>Handler: Listener created

    rect rgb(220,240,220)
      Note over Handler,Client: Immediately emit StreamStartedEvent (new)
      Handler-->>Client: StreamStartedEvent(id)
    end

    Handler->>Handler: Start heartbeat & event loop
    loop streaming
      Listener-->>Handler: Event
      Handler-->>Client: Event
    end
    deactivate Handler
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus review on:
    • internal/interface/grpc/handlers/arkservice.go — oneof handling, overwrite vs modify precedence, validation and error paths.
    • internal/interface/grpc/handlers/broker.go — per-listener locking correctness, potential deadlocks/races, channel lifecycle.
    • Protobuf ↔ OpenAPI consistency for new messages/fields.
    • internal/interface/grpc/handlers/broker_test.go — concurrency assumptions in tests.

Possibly related issues

Possibly related PRs

Suggested reviewers

  • tiero
  • altafan

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'rpc for updating event stream topics' clearly and concisely summarizes the main change: adding a new RPC method to update event stream topics, which is the primary focus of the changeset.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

}},
fmt.Sprintf("/%s/UpdateStreamTopics", arkv1.ArkService_ServiceDesc.ServiceName): {{
Entity: EntityArk,
Action: "read",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be write ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

@bitcoin-coder-bob bitcoin-coder-bob marked this pull request as ready for review November 23, 2025 10:08
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
api-spec/openapi/swagger/ark/v1/service.openapi.json (2)

299-339: UpdateStreamTopics OpenAPI wiring matches proto; consider tightening validation docs

The new /v1/batch/updateTopics endpoint and the UpdateStreamTopicsRequest/UpdateStreamTopicsResponse schemas line up with the protobuf definitions (streamId + add/remove/overwrite topics, and the three response arrays).

To better reflect the handler’s behavior, you may want to:

  • Mark streamId as required in UpdateStreamTopicsRequest.
  • Document that at least one of addTopics, removeTopics, or overwriteTopics must be non-empty.
  • Clarify in the description that when overwriteTopics is non-empty, addTopics and removeTopics (if present) are ignored.

This keeps clients from sending structurally valid but semantically rejected payloads.

Also applies to: 1231-1281


695-697: streamStarted field and StreamStartedEvent schema are consistent; consider updating GetEventStream docs

The new GetEventStreamResponse.streamStarted property referencing StreamStartedEvent correctly exposes the initial stream-start notification.

To make the contract clearer for clients, consider augmenting the /v1/batch/events description to mention:

  • That a streamStarted event is emitted immediately upon subscription.
  • That the id from this event must be used as streamId when calling UpdateStreamTopics.

This would tie the two APIs together explicitly in the OpenAPI docs.

Also applies to: 966-974

api-spec/protobuf/ark/v1/service.proto (1)

239-250: Add field-level documentation to clarify topic operation semantics.

The message design supports flexible topic management but lacks documentation on edge cases: what if a topic appears in both add_topics and remove_topics? What if overwrite_topics is provided alongside add/remove_topics? What counts as a failed operation?

Consider adding documentation like:

 message UpdateStreamTopicsRequest {
   string stream_id = 1;
-  repeated string add_topics = 2;
-  repeated string remove_topics = 3;
-  repeated string overwrite_topics = 4;
+  // Topics to add. If already present, no-op for that topic.
+  repeated string add_topics = 2;
+  // Topics to remove. If not present, no-op for that topic.
+  repeated string remove_topics = 3;
+  // If provided, replaces all topics (add/remove are ignored). If empty, clears all topics.
+  repeated string overwrite_topics = 4;
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7afe52a and 09a4ea0.

⛔ Files ignored due to path filters (5)
  • api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go is excluded by !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service.pb.rgw.go is excluded by !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/ark/v1/types.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (8)
  • README.md (1 hunks)
  • api-spec/openapi/swagger/ark/v1/service.openapi.json (4 hunks)
  • api-spec/openapi/swagger/ark/v1/types.openapi.json (1 hunks)
  • api-spec/protobuf/ark/v1/service.proto (2 hunks)
  • api-spec/protobuf/ark/v1/types.proto (1 hunks)
  • internal/interface/grpc/handlers/arkservice.go (2 hunks)
  • internal/interface/grpc/handlers/broker.go (1 hunks)
  • internal/interface/grpc/permissions/permissions.go (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-08T08:12:00.266Z
Learnt from: altafan
Repo: arkade-os/arkd PR: 659
File: .github/workflows/release.yaml:183-201
Timestamp: 2025-07-08T08:12:00.266Z
Learning: The arkd wallet Dockerfile in the arkade-os/arkd repository is named `arkdwallet.Dockerfile` (without hyphens), not `arkd-wallet.Dockerfile`.

Applied to files:

  • README.md
🧬 Code graph analysis (2)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (11)
  • GetEventStreamResponse (827-845)
  • GetEventStreamResponse (858-858)
  • GetEventStreamResponse (873-875)
  • GetEventStreamResponse_StreamStarted (1027-1029)
  • GetEventStreamResponse_StreamStarted (1051-1051)
  • UpdateStreamTopicsRequest (1053-1061)
  • UpdateStreamTopicsRequest (1074-1074)
  • UpdateStreamTopicsRequest (1089-1091)
  • UpdateStreamTopicsResponse (1121-1128)
  • UpdateStreamTopicsResponse (1141-1141)
  • UpdateStreamTopicsResponse (1156-1158)
api-spec/protobuf/gen/ark/v1/types.pb.go (3)
  • StreamStartedEvent (1393-1398)
  • StreamStartedEvent (1411-1411)
  • StreamStartedEvent (1426-1428)
internal/interface/grpc/permissions/permissions.go (1)
api-spec/protobuf/gen/ark/v1/service_grpc.pb.go (1)
  • ArkService_ServiceDesc (616-678)
🔇 Additional comments (9)
README.md (1)

125-125: Doc wording fix looks good

The clarification that arkd-wallet is used as a liquidity provider reads correctly; no further changes needed here.

internal/interface/grpc/permissions/permissions.go (1)

159-162: UpdateStreamTopics whitelisting as ark:read is consistent

Mapping /ArkService/UpdateStreamTopics to EntityArk with Action: "read" aligns with other streaming/read-style RPCs like GetEventStream and GetTransactionsStream. This keeps topic updates available to clients that already have read access without granting stronger wallet/manager permissions.

api-spec/protobuf/ark/v1/types.proto (1)

148-152: StreamStartedEvent proto definition is straightforward and consistent

Single string id = 1; fits the use as a stream identifier and matches the generated StreamStartedEvent type referenced by GetEventStreamResponse_StreamStarted.

api-spec/openapi/swagger/ark/v1/types.openapi.json (1)

213-221: OpenAPI StreamStartedEvent mirrors proto as expected

The StreamStartedEvent schema (id: string) matches the protobuf definition and the usage in GetEventStreamResponse.streamStarted, so the type surface is consistent across specs.

internal/interface/grpc/handlers/broker.go (1)

147-162: overwriteTopics implementation looks correct and matches existing helpers

The method:

  • Guards access with the broker lock.
  • Validates the listener id with a clear error.
  • Rebuilds a fresh topics map using formatTopic, ensuring old topics are fully replaced.

This is consistent with addTopics/removeTopics behavior and is a good primitive for the overwrite branch in UpdateStreamTopics.

internal/interface/grpc/handlers/arkservice.go (1)

222-232: Immediate StreamStartedEvent on subscription is well-integrated

Emitting a StreamStartedEvent right after registering the listener ensures clients always receive the stream id they need for UpdateStreamTopics before any other events or heartbeats. The use of a UUID for listener.id and direct stream.Send before entering the main loop is clean and matches the new API surface.

api-spec/protobuf/ark/v1/service.proto (3)

97-105: RPC definition is well-structured and properly configured for HTTP gateway.

The new UpdateStreamTopics RPC follows the existing service patterns with clear HTTP mapping and appropriate request/response types.


235-235: StreamStartedEvent field addition is backward compatible.

Adding the new oneof member with field number 11 (after heartbeat at 10) maintains proper sequencing and doesn't break existing clients that ignore unknown fields.


239-250: Verify that proto3 validation is enforced in the implementation for required fields.

Proto3 has no explicit required keyword; stream_id will have default zero value if omitted. Ensure the gRPC handler in arkservice.go validates that stream_id is non-empty before processing.

@bitcoin-coder-bob bitcoin-coder-bob marked this pull request as draft November 23, 2025 10:37
@bitcoin-coder-bob bitcoin-coder-bob marked this pull request as ready for review November 23, 2025 11:10
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/interface/grpc/handlers/arkservice.go (2)

212-233: Closing listener channels while other goroutines may still send can panic

In GetEventStream, the listener channel is closed via a defer close(listener.ch) while other goroutines (listenToEvents) can still be doing l.ch <- ev.event. That can lead to send on closed channel panics, regardless of defer ordering, because senders hold references to listener outside the broker map.

Given the fan-out model, it’s safer to avoid closing listener.ch at all from here, and instead rely on removing the listener from the broker so that no new senders are created. Otherwise you’d need a more explicit shutdown protocol that guarantees no goroutine can still send before closing the channel.

Same concern applies conceptually to the transactions stream.


556-569: Unsafe concurrent access to listeners maps (potential concurrent map read and map write)

Both listenToEvents and listenToTxEvents iterate directly over h.eventsListenerHandler.listeners / h.transactionsListenerHandler.listeners without taking the broker’s RWMutex, while other goroutines mutate these maps via pushListener, removeListener, and timeout handlers. This is a classic source of concurrent map read and map write panics under load.

Given you already have broker.getListenersCopy() guarded by locks, a safer pattern would be:

  • Take a snapshot with getListenersCopy() under RLock.
  • Iterate over the snapshot map in these loops, spawning goroutines from the snapshot values.

That avoids unsynchronized map reads while preserving current behavior.

Also applies to: 595-603

♻️ Duplicate comments (1)
internal/interface/grpc/handlers/arkservice.go (1)

273-318: UpdateStreamTopics semantics look correct; consider mapping missing-stream errors to NotFound

The handler’s behavior matches the API description: it validates stream_id, gives overwrite precedence, supports combined add/remove via fallthrough, and returns AllTopics plus the requested additions/removals. That part looks good.

However, when the broker returns fmt.Errorf("subscription %s not found", id), this bubbles up as a generic error and will surface as gRPC Unknown. It would be more client-friendly to translate this into status.Error(codes.NotFound, ...) (or equivalent) so callers can distinguish “bad stream id” from internal failures.

🧹 Nitpick comments (4)
internal/interface/grpc/handlers/broker_test.go (1)

176-206: Good coverage for overwriteTopics semantics

The test exercises the happy path, overwrite-to-empty, and non-existent subscription error, matching the broker’s behavior. This is sufficient functional coverage; adding a case-insensitivity/duplicate-topics case would be a nice-to-have but isn’t required.

internal/interface/grpc/handlers/broker.go (3)

16-29: Per-listener lock is a good fix; consider using a value RWMutex

Adding lock *sync.RWMutex to listener and initializing it in newListener correctly sets up per-listener synchronization and fixes the previous map[string]struct{} race on topics.

For future cleanliness, you might consider embedding a value sync.RWMutex instead of a pointer to avoid heap allocation and nil checks:

-type listener[T any] struct {
-    id           string
-    topics       map[string]struct{}
-    ch           chan T
-    timeoutTimer *time.Timer
-    lock         *sync.RWMutex
-}
+type listener[T any] struct {
+    id           string
+    topics       map[string]struct{}
+    ch           chan T
+    timeoutTimer *time.Timer
+    mu           sync.RWMutex
+}

and update call sites accordingly. Not critical, just a style/perf nicety.


32-35: Use RLock/RUnlock in includesAny for read-only access

includesAny only reads listener.topics, so taking an exclusive Lock is unnecessary and slightly increases contention. A read lock is sufficient:

-func (l *listener[T]) includesAny(topics []string) bool {
-    l.lock.Lock()
-    defer l.lock.Unlock()
+func (l *listener[T]) includesAny(topics []string) bool {
+    l.lock.RLock()
+    defer l.lock.RUnlock()

Same reasoning applies if you keep a value sync.RWMutex instead of a pointer.


95-105: Topic access now correctly synchronized; minor cleanups possible

The new locking strategy around topic helpers looks sound:

  • getTopics holds the broker’s RLock and then the listener’s lock while building the topics slice, avoiding races on the topics map.
  • addTopics, removeTopics, removeAllTopics, and overwriteTopics hold the broker’s write lock, then the per-listener lock, and mutate listener.topics only under both, which is consistent and avoids deadlocks (no code takes the listener lock before the broker lock).
  • overwriteTopics atomically replaces the topics map with a freshly formatted one, which matches the intended semantics.

Two small optional polish points:

  • In each helper you can avoid double lookups by doing listener, ok := h.listeners[id] once and reusing listener.
  • In getTopics you can use RLock/RUnlock on the listener lock since it’s a read-only operation, similar to includesAny.

These are minor and not correctness issues.

Also applies to: 113-184

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 09a4ea0 and 8958205.

📒 Files selected for processing (4)
  • api-spec/protobuf/ark/v1/service.proto (3 hunks)
  • internal/interface/grpc/handlers/arkservice.go (2 hunks)
  • internal/interface/grpc/handlers/broker.go (6 hunks)
  • internal/interface/grpc/handlers/broker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api-spec/protobuf/ark/v1/service.proto
🧰 Additional context used
🧬 Code graph analysis (2)
internal/interface/grpc/handlers/broker_test.go (1)
pkg/errors/errors.go (1)
  • Error (39-46)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (11)
  • GetEventStreamResponse (827-845)
  • GetEventStreamResponse (858-858)
  • GetEventStreamResponse (873-875)
  • GetEventStreamResponse_StreamStarted (1027-1029)
  • GetEventStreamResponse_StreamStarted (1051-1051)
  • UpdateStreamTopicsRequest (1053-1061)
  • UpdateStreamTopicsRequest (1074-1074)
  • UpdateStreamTopicsRequest (1089-1091)
  • UpdateStreamTopicsResponse (1121-1128)
  • UpdateStreamTopicsResponse (1141-1141)
  • UpdateStreamTopicsResponse (1156-1158)
api-spec/protobuf/gen/ark/v1/types.pb.go (3)
  • StreamStartedEvent (1393-1398)
  • StreamStartedEvent (1411-1411)
  • StreamStartedEvent (1426-1428)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/interface/grpc/handlers/broker.go (1)

149-163: Consider refactoring to reduce duplication.

The implementation is correct and maintains consistent lock ordering. However, removeAllTopics is functionally equivalent to overwriteTopics(id, []string{}). Consider refactoring to reduce code duplication.

Optional refactor to eliminate duplication:

 func (h *broker[T]) removeAllTopics(id string) error {
-	h.lock.Lock()
-	defer h.lock.Unlock()
-
-	listener, ok := h.listeners[id]
-	if !ok {
-		return fmt.Errorf("subscription %s not found", id)
-	}
-
-	listener.lock.Lock()
-	defer listener.lock.Unlock()
-
-	listener.topics = make(map[string]struct{})
-	return nil
+	return h.overwriteTopics(id, []string{})
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8958205 and d5a9df4.

📒 Files selected for processing (1)
  • internal/interface/grpc/handlers/broker.go (6 hunks)
🔇 Additional comments (6)
internal/interface/grpc/handlers/broker.go (6)

16-16: LGTM! Per-listener synchronization added.

The addition of a per-listener RWMutex enables safe concurrent topic updates, which is essential for the new UpdateStreamTopics RPC functionality.


19-30: LGTM! Lock properly initialized.

The lock is correctly initialized in the constructor, ensuring it's ready before the listener is used.


32-46: LGTM! Appropriate use of read lock.

The use of RLock for the read-only topic check is correct and allows multiple concurrent readers while preventing race conditions.


131-147: LGTM! Consistent lock ordering maintained.

The implementation follows the same lock ordering pattern as addTopics (broker → listener), which is essential for deadlock prevention.


165-184: LGTM! Clean implementation of topic overwrite.

The new overwriteTopics method correctly:

  • Maintains consistent lock ordering (broker → listener)
  • Creates a new map instead of modifying the existing one
  • Formats topics consistently with other methods
  • Handles non-existent listeners appropriately

113-129: Lock ordering is consistent and correct throughout the codebase.

Verification confirms that all methods in the broker handler—including the addTopics method under review—consistently follow the correct lock ordering pattern: the broker lock (h.lock) is always acquired before the listener lock (listener.lock). This holds across getTopics, addTopics, removeTopics, removeAllTopics, and overwriteTopics, with appropriate use of read-locks where applicable. The implementation prevents deadlocks as intended.

string stream_id = 1;
repeated string add_topics = 2;
repeated string remove_topics = 3;
repeated string overwrite_topics = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use oneOf here to make it clearer by definition?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I like that, will add

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal/interface/grpc/handlers/arkservice.go (2)

286-296: Consider using codes.NotFound for subscription errors.

When the broker returns "subscription %s not found", the error currently becomes codes.Internal. Consider wrapping this as status.Error(codes.NotFound, ...) to help clients distinguish "invalid stream_id" from internal failures.

Apply this pattern to wrap broker errors:

 		if err := h.eventsListenerHandler.overwriteTopics(
 			req.GetStreamId(), req.GetTopicsChange().(*arkv1.UpdateStreamTopicsRequest_Overwrite).Overwrite.Topics,
 		); err != nil {
-			return nil, status.Errorf(codes.Internal, "overwrite topics error: %s", err.Error())
+			if strings.Contains(err.Error(), "not found") {
+				return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error())
+			}
+			return nil, status.Errorf(codes.Internal, "overwrite topics error: %s", err.Error())
 		}

301-312: Consider using codes.NotFound for subscription errors.

Similar to the overwrite case, when add/remove operations fail with "subscription not found", wrap the error as codes.NotFound instead of codes.Internal to enable proper client error handling.

Apply this pattern:

 		if len(modify.AddTopics) > 0 {
 			if err := h.eventsListenerHandler.addTopics(
 				req.GetStreamId(), modify.AddTopics,
 			); err != nil {
-				return nil, status.Errorf(codes.Internal, "add topics error: %s", err.Error())
+				if strings.Contains(err.Error(), "not found") {
+					return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error())
+				}
+				return nil, status.Errorf(codes.Internal, "add topics error: %s", err.Error())
 			}
 		}
 		if len(modify.RemoveTopics) > 0 {
 			if err := h.eventsListenerHandler.removeTopics(
 				req.GetStreamId(), modify.RemoveTopics,
 			); err != nil {
-				return nil, status.Errorf(codes.Internal, "remove topics error: %s", err.Error())
+				if strings.Contains(err.Error(), "not found") {
+					return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error())
+				}
+				return nil, status.Errorf(codes.Internal, "remove topics error: %s", err.Error())
 			}
 		}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d5a9df4 and 8ca01b0.

⛔ Files ignored due to path filters (3)
  • api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go is excluded by !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (5)
  • api-spec/openapi/swagger/ark/v1/service.openapi.json (7 hunks)
  • api-spec/protobuf/ark/v1/service.proto (3 hunks)
  • internal/interface/grpc/handlers/arkservice.go (2 hunks)
  • internal/interface/grpc/handlers/broker.go (4 hunks)
  • internal/interface/grpc/permissions/permissions.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/interface/grpc/permissions/permissions.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/broker.go (1)
api-spec/protobuf/gen/ark/v1/service.pb.go (3)
  • OverwriteTopics (1105-1110)
  • OverwriteTopics (1123-1123)
  • OverwriteTopics (1138-1140)
🔇 Additional comments (12)
internal/interface/grpc/handlers/arkservice.go (2)

222-232: LGTM! StreamStartedEvent emission is correct.

The immediate emission of StreamStartedEvent with the listener ID enables clients to subsequently call UpdateStreamTopics, which is the intended design.


273-279: LGTM! Input validation is appropriate.

The stream_id validation ensures the required field is present before processing.

internal/interface/grpc/handlers/broker.go (4)

11-30: LGTM! Per-listener locking correctly initialized.

The addition of sync.RWMutex to the listener struct and its initialization in newListener establishes the foundation for thread-safe topic management.


32-46: LGTM! Read lock correctly protects topic reads.

The use of RLock in includesAny properly guards concurrent reads of the topics map while allowing multiple concurrent readers.


48-95: LGTM! Topic management methods correctly synchronized.

The new methods follow proper locking discipline:

  • AddTopics, RemoveTopics, OverwriteTopics: Use write locks for mutations
  • GetTopics, Channel: Use read locks for non-mutating access

The nil-checks in AddTopics and RemoveTopics provide defensive safety.


133-195: LGTM! Broker methods correctly delegate to listener methods.

The refactored broker methods follow a consistent pattern:

  1. Acquire broker-level RLock to safely read from the listeners map
  2. Retrieve the listener
  3. Release broker-level lock
  4. Delegate to listener methods (which handle their own synchronization)

This minimizes lock contention while ensuring thread safety.

api-spec/openapi/swagger/ark/v1/service.openapi.json (3)

95-95: LGTM! Documentation accurately reflects StreamStartedEvent behavior.

The updated description correctly documents that StreamStartedEvent is emitted immediately and provides the stream ID for UpdateStreamTopics calls.


299-339: LGTM! New UpdateStreamTopics endpoint properly defined.

The endpoint definition correctly specifies:

  • POST method at /v1/batch/updateTopics
  • Request/response body schemas
  • Operation ID for code generation

695-697: LGTM! Schema definitions align with protobuf spec.

The new schemas correctly represent:

  • StreamStartedEvent with id field
  • ModifyTopics with addTopics and removeTopics arrays
  • OverwriteTopics with topics array
  • UpdateStreamTopicsRequest with streamId, modify, and overwrite fields
  • UpdateStreamTopicsResponse with topicsAdded, topicsRemoved, and allTopics fields

The OpenAPI representation of the protobuf oneof as optional fields is appropriate.

Also applies to: 872-889, 903-914, 996-1004, 1261-1300

api-spec/protobuf/ark/v1/service.proto (3)

86-88: LGTM! Documentation accurately describes StreamStartedEvent behavior.

The updated comment correctly explains that StreamStartedEvent is immediately emitted with the stream ID for subsequent UpdateStreamTopics calls.


99-107: LGTM! UpdateStreamTopics RPC properly defined.

The RPC definition correctly:

  • Accepts UpdateStreamTopicsRequest and returns UpdateStreamTopicsResponse
  • Maps to POST /v1/batch/updateTopics via HTTP annotation
  • Includes body: "*" for JSON request body handling

237-267: LGTM! Message definitions correctly use oneof for mutually exclusive options.

The protobuf definitions are well-structured:

  • StreamStartedEvent added to GetEventStreamResponse oneof with field number 11
  • ModifyTopics and OverwriteTopics provide distinct topic management strategies
  • UpdateStreamTopicsRequest uses oneof topics_change to model the mutually exclusive modify/overwrite choice
  • Comment clearly documents precedence rules
  • UpdateStreamTopicsResponse includes all necessary fields to reflect changes

The use of oneof addresses the past review feedback and provides type-safe modeling of the API.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e327750 and 1fe8449.

📒 Files selected for processing (1)
  • internal/interface/grpc/handlers/arkservice.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (15)
  • GetEventStreamResponse (827-845)
  • GetEventStreamResponse (858-858)
  • GetEventStreamResponse (873-875)
  • GetEventStreamResponse_StreamStarted (1027-1029)
  • GetEventStreamResponse_StreamStarted (1051-1051)
  • UpdateStreamTopicsRequest (1153-1163)
  • UpdateStreamTopicsRequest (1176-1176)
  • UpdateStreamTopicsRequest (1191-1193)
  • UpdateStreamTopicsResponse (1243-1250)
  • UpdateStreamTopicsResponse (1263-1263)
  • UpdateStreamTopicsResponse (1278-1280)
  • UpdateStreamTopicsRequest_Overwrite (1235-1237)
  • UpdateStreamTopicsRequest_Overwrite (1241-1241)
  • UpdateStreamTopicsRequest_Modify (1231-1233)
  • UpdateStreamTopicsRequest_Modify (1239-1239)
api-spec/protobuf/gen/ark/v1/types.pb.go (3)
  • StreamStartedEvent (1393-1398)
  • StreamStartedEvent (1411-1411)
  • StreamStartedEvent (1426-1428)
🔇 Additional comments (1)
internal/interface/grpc/handlers/arkservice.go (1)

222-232: LGTM - StreamStartedEvent emission implemented correctly.

The immediate emission of StreamStartedEvent with the listener ID provides the necessary stream identifier for clients to call UpdateStreamTopics. Error handling is appropriate.

l.topics = newTopics
}

func (l *listener[T]) GetTopics() []string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to not be needed, we can keep using the existing getTopics

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per this comment I made the change: #829 (comment)

This idea of breaking these listener functions out was so that we use the specific listener's lock.

Should I still revert back?

}
}

func (l *listener[T]) OverwriteTopics(topics []string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to be exported, it is only used within the package

Comment on lines 59 to 68
func (l *listener[T]) RemoveTopics(topics []string) {
l.lock.Lock()
defer l.lock.Unlock()
if l.topics == nil {
return
}
for _, topic := range topics {
delete(l.topics, formatTopic(topic))
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to GetTopics change, it seems not needed. can't we use the existing removeTopics ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per this comment I made the change: #829 (comment)

This idea of breaking these listener functions out was so that we use the specific listener's lock.

Should I still revert back?

Comment on lines 48 to 58
func (l *listener[T]) AddTopics(topics []string) {
l.lock.Lock()
defer l.lock.Unlock()
if l.topics == nil {
l.topics = make(map[string]struct{}, len(topics))
}
for _, topic := range topics {
l.topics[formatTopic(topic)] = struct{}{}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per this comment I made the change: #829 (comment)

This idea of breaking these listener functions out was so that we use the specific listener's lock.

Should I still revert back?

Comment on lines 90 to 95
func (l *listener[T]) Channel() chan T {
// no strong reason to lock here, but keep RLock to be safe if ch could be replaced
l.lock.RLock()
defer l.lock.RUnlock()
return l.ch
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be replaced

in theory, it is never replaced, so I'd remove this method and keep accessing via l.ch, except if I'm missing something ? In any case, we shouldn't export the method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per this comment I made the change: #829 (comment)

This idea of breaking these listener functions out was so that we use the specific listener's lock.

Should I still revert back?

in the meantime I fix, so that we do not export the method

Comment on lines 175 to 184
func (h *broker[T]) removeAllTopics(id string) error {
h.lock.Lock()
defer h.lock.Unlock()

if _, ok := h.listeners[id]; !ok {
h.lock.RLock()
listener, ok := h.listeners[id]
h.lock.RUnlock()
if !ok {
return fmt.Errorf("subscription %s not found", id)
}
listener.OverwriteTopics([]string{})
return nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason to change removeAllTopics ? I see the lock change, but then the lock is acquired again by OverwriteTopics. it seems to me we can revert that change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do the behavior of removing all the topics by calling OverwriteTopics with an empty array of topics. Calling OverwriteTopics uses the listener's locks. The old way of setting the topics directly in removeAllTopics did not use the listener's locks.


oneof topics_change {
ModifyTopics modify = 2;
OverwriteTopics overwrite = 3;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the overwrite feature, but do we have real needs for it @Kukks ? If not a common use case, maybe worth to let client handles it (by removing current topics and adding new ones) so we simplify the proto spec

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal/interface/grpc/handlers/broker.go (2)

11-29: Prefer sync.RWMutex by value to make zero-value listeners safe

Using lock *sync.RWMutex (initialized in newListener) means any listener created via a struct literal without setting lock will panic on first Lock/RLock. A value field (lock sync.RWMutex) is idiomatic and makes the zero value usable, while keeping call sites the same (l.lock.Lock() still works via implicit address-taking).

Consider:

-	lock         *sync.RWMutex
+	lock         sync.RWMutex

and in newListener you can omit explicit initialization of lock (or leave it, both are fine).

This reduces the chance of subtle panics if tests or future code construct listener directly.


90-95: channel() accessor is defensively locked but acceptable

Locking around access to l.ch is conservative given the channel is never reassigned today, but the cost is negligible and it future‑proofs against potential channel replacement. The method is unexported, so the surface area is contained.

I’d leave as is unless profiling ever shows this on a hot path.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d16ee77 and ce3c56e.

📒 Files selected for processing (1)
  • internal/interface/grpc/handlers/broker.go (4 hunks)
🔇 Additional comments (2)
internal/interface/grpc/handlers/broker.go (2)

32-88: Per-listener topic locking and helpers look correct

The switch to RLock/RUnlock in includesAny and the new helpers (addTopics, removeTopics, overwriteTopics, getTopics) correctly centralize synchronization on listener.topics and keep all map access under the same mutex. The nil-guard in addTopics/removeTopics and the copy in getTopics avoid races on the internal map.

No issues from a correctness or concurrency standpoint.


133-195: Broker refactor to delegate to listener locks is sound

The pattern of:

  • Taking h.lock.RLock() only to look up the listener,
  • Releasing the broker lock before calling listener.channel(), getTopics, addTopics, removeTopics, or overwriteTopics,
  • Guarding all per-listener mutations with listener.lock,

is a good separation of concerns and avoids holding the global broker lock while doing per-listener work.

The small TOCTOU window (listener removed after lookup but before the listener method call) is benign since operations would then only affect an unreachable listener instance.

No changes needed here.

@bitcoin-coder-bob
Copy link
Author

this says I force pushed, I used git push --force-with-lease -u fork update-steam-topics-rpc

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/interface/grpc/handlers/broker.go (1)

90-95: Remove unnecessary RLock in channel() accessor.

The comment acknowledges there's "no strong reason to lock here" since the channel is never replaced after initialization. The RLock adds unnecessary overhead to every event send operation. Since ch is immutable after newListener returns, no synchronization is needed.

Apply this diff:

 func (l *listener[T]) channel() chan T {
-	// no strong reason to lock here, but keep RLock to be safe if ch could be replaced
-	l.lock.RLock()
-	defer l.lock.RUnlock()
 	return l.ch
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce3c56e and cf60d6a.

⛔ Files ignored due to path filters (4)
  • api-spec/protobuf/gen/ark/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service.pb.rgw.go is excluded by !**/gen/**
  • api-spec/protobuf/gen/ark/v1/service_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/ark/v1/types.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (9)
  • README.md (1 hunks)
  • api-spec/openapi/swagger/ark/v1/service.openapi.json (7 hunks)
  • api-spec/openapi/swagger/ark/v1/types.openapi.json (1 hunks)
  • api-spec/protobuf/ark/v1/service.proto (3 hunks)
  • api-spec/protobuf/ark/v1/types.proto (1 hunks)
  • internal/interface/grpc/handlers/arkservice.go (2 hunks)
  • internal/interface/grpc/handlers/broker.go (4 hunks)
  • internal/interface/grpc/handlers/broker_test.go (1 hunks)
  • internal/interface/grpc/permissions/permissions.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • api-spec/protobuf/ark/v1/types.proto
  • README.md
  • api-spec/openapi/swagger/ark/v1/types.openapi.json
  • internal/interface/grpc/permissions/permissions.go
  • internal/interface/grpc/handlers/broker_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (15)
  • GetEventStreamResponse (827-845)
  • GetEventStreamResponse (858-858)
  • GetEventStreamResponse (873-875)
  • GetEventStreamResponse_StreamStarted (1027-1029)
  • GetEventStreamResponse_StreamStarted (1051-1051)
  • UpdateStreamTopicsRequest (1153-1163)
  • UpdateStreamTopicsRequest (1176-1176)
  • UpdateStreamTopicsRequest (1191-1193)
  • UpdateStreamTopicsResponse (1243-1250)
  • UpdateStreamTopicsResponse (1263-1263)
  • UpdateStreamTopicsResponse (1278-1280)
  • UpdateStreamTopicsRequest_Overwrite (1235-1237)
  • UpdateStreamTopicsRequest_Overwrite (1241-1241)
  • UpdateStreamTopicsRequest_Modify (1231-1233)
  • UpdateStreamTopicsRequest_Modify (1239-1239)
api-spec/protobuf/gen/ark/v1/types.pb.go (3)
  • StreamStartedEvent (1393-1398)
  • StreamStartedEvent (1411-1411)
  • StreamStartedEvent (1426-1428)
🔇 Additional comments (10)
api-spec/openapi/swagger/ark/v1/service.openapi.json (1)

95-95: LGTM! OpenAPI spec correctly reflects the protobuf definitions.

The OpenAPI specification properly defines the new UpdateStreamTopics endpoint and related schemas. The additions are consistent with the protobuf definitions and follow OpenAPI 3.1.0 conventions.

Also applies to: 299-339, 695-697, 872-914, 996-1004, 1261-1300

internal/interface/grpc/handlers/arkservice.go (2)

222-232: LGTM! StreamStartedEvent emission is correctly implemented.

The immediate emission of StreamStartedEvent upon stream creation is well-placed. It sends the listener ID to the client before any other events, enabling subsequent UpdateStreamTopics calls. Error handling is appropriate—if the send fails, the error is returned and the stream is properly cleaned up by the deferred calls.


273-327: LGTM! UpdateStreamTopics implementation is solid.

The handler correctly implements the update semantics:

  • Stream ID validation ensures the required field is present
  • Type switch safely handles the oneof variants without panic risk
  • Overwrite takes precedence over modify, as documented
  • Modify case allows simultaneous add/remove operations
  • Error codes are appropriate: InvalidArgument for bad input, NotFound for missing stream
  • Response structure matches the proto definition

The past review concerns about concurrency and type assertions have been properly addressed.

internal/interface/grpc/handlers/broker.go (4)

16-16: LGTM! Per-listener locking correctly addresses the concurrency issue.

The addition of a per-listener RWMutex properly resolves the data race flagged in previous reviews. This allows concurrent topic updates (via UpdateStreamTopics) and topic reads (via includesAny during event forwarding) to be safely synchronized without blocking unrelated listeners.

Also applies to: 28-28


32-46: LGTM! Read lock properly protects topic access.

Using RLock for the read-only includesAny operation is correct and allows multiple concurrent readers, which is important for event forwarding performance when many listeners are checking topics simultaneously.


48-88: LGTM! Thread-safe topic management methods are well-implemented.

The listener methods properly encapsulate topic map access with appropriate locking:

  • addTopics/removeTopics/overwriteTopics use Lock for write operations
  • getTopics uses RLock for read-only access
  • Nil map checks prevent panics
  • Topic formatting is consistently applied

This design cleanly separates listener-level synchronization from broker-level synchronization, reducing lock contention.


133-195: LGTM! Broker methods correctly delegate to listener accessors.

The broker methods follow a good pattern:

  1. Acquire broker RLock to safely read the listeners map
  2. Get the listener reference
  3. Release broker lock immediately (avoiding nested lock holding)
  4. Call listener methods that acquire their own locks

This approach minimizes lock contention and prevents potential deadlocks. The "subscription not found" errors provide clear feedback for invalid stream IDs.

api-spec/protobuf/ark/v1/service.proto (3)

86-88: LGTM! Documentation correctly describes StreamStartedEvent behavior.

The updated comment accurately explains that StreamStartedEvent is immediately sent upon stream creation and provides the stream ID for subsequent UpdateStreamTopics calls. This helps API consumers understand the flow.


99-107: LGTM! UpdateStreamTopics RPC definition is clean and well-documented.

The RPC definition follows protobuf conventions with appropriate HTTP mapping to POST /v1/batch/updateTopics. The comment clearly explains the functionality.


237-267: LGTM! Message definitions are well-structured.

The protobuf message definitions are excellent:

  • StreamStartedEvent properly added to GetEventStreamResponse oneof
  • ModifyTopics and OverwriteTopics provide clear separation of update semantics
  • oneof topics_change correctly models the mutually exclusive update modes
  • Comment on lines 250-253 clearly documents precedence and requirements
  • UpdateStreamTopicsResponse provides comprehensive feedback with added, removed, and all topics

The use of oneof for topics_change is a good design choice that enforces the either-modify-or-overwrite constraint at the protocol level.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants