-
Notifications
You must be signed in to change notification settings - Fork 49
rpc for updating event stream topics #829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
rpc for updating event stream topics #829
Conversation
|
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (2)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the WalkthroughAdds StreamStartedEvent emission immediately when an event stream is created, a new UpdateStreamTopics RPC (modify/overwrite semantics) to change stream topics without restarting streams, per-listener locking and accessor methods in the broker, OpenAPI/Protobuf updates, gRPC handler changes and tests, plus a README typo fix. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler as UpdateStreamTopics Handler
participant Broker
participant Listener
Client->>Handler: UpdateStreamTopics(stream_id, {modify|overwrite})
activate Handler
Handler->>Handler: Validate stream_id & payload
alt overwrite provided
Handler->>Broker: overwriteTopics(stream_id, topics)
activate Broker
Broker->>Listener: OverwriteTopics (listener lock)
Broker-->>Handler: Success + all_topics
deactivate Broker
else modify (add/remove)
Handler->>Broker: addTopics(stream_id, add)
Handler->>Broker: removeTopics(stream_id, remove)
Broker-->>Handler: topics_added, topics_removed
end
Handler->>Broker: getTopics(stream_id)
Handler-->>Client: UpdateStreamTopicsResponse(topics_added, topics_removed, all_topics)
deactivate Handler
sequenceDiagram
participant Client
participant Handler as GetEventStream Handler
participant Broker
participant Listener
Client->>Handler: GetEventStream(request)
activate Handler
Handler->>Broker: create/listen -> Listener
Broker-->>Handler: Listener created
rect rgb(220,240,220)
Note over Handler,Client: Immediately emit StreamStartedEvent (new)
Handler-->>Client: StreamStartedEvent(id)
end
Handler->>Handler: Start heartbeat & event loop
loop streaming
Listener-->>Handler: Event
Handler-->>Client: Event
end
deactivate Handler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related issues
Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| }}, | ||
| fmt.Sprintf("/%s/UpdateStreamTopics", arkv1.ArkService_ServiceDesc.ServiceName): {{ | ||
| Entity: EntityArk, | ||
| Action: "read", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be write ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
api-spec/openapi/swagger/ark/v1/service.openapi.json (2)
299-339:UpdateStreamTopicsOpenAPI wiring matches proto; consider tightening validation docsThe new
/v1/batch/updateTopicsendpoint and theUpdateStreamTopicsRequest/UpdateStreamTopicsResponseschemas line up with the protobuf definitions (streamId + add/remove/overwrite topics, and the three response arrays).To better reflect the handler’s behavior, you may want to:
- Mark
streamIdas required inUpdateStreamTopicsRequest.- Document that at least one of
addTopics,removeTopics, oroverwriteTopicsmust be non-empty.- Clarify in the description that when
overwriteTopicsis non-empty,addTopicsandremoveTopics(if present) are ignored.This keeps clients from sending structurally valid but semantically rejected payloads.
Also applies to: 1231-1281
695-697:streamStartedfield andStreamStartedEventschema are consistent; consider updating GetEventStream docsThe new
GetEventStreamResponse.streamStartedproperty referencingStreamStartedEventcorrectly exposes the initial stream-start notification.To make the contract clearer for clients, consider augmenting the
/v1/batch/eventsdescription to mention:
- That a
streamStartedevent is emitted immediately upon subscription.- That the
idfrom this event must be used asstreamIdwhen callingUpdateStreamTopics.This would tie the two APIs together explicitly in the OpenAPI docs.
Also applies to: 966-974
api-spec/protobuf/ark/v1/service.proto (1)
239-250: Add field-level documentation to clarify topic operation semantics.The message design supports flexible topic management but lacks documentation on edge cases: what if a topic appears in both
add_topicsandremove_topics? What ifoverwrite_topicsis provided alongsideadd/remove_topics? What counts as a failed operation?Consider adding documentation like:
message UpdateStreamTopicsRequest { string stream_id = 1; - repeated string add_topics = 2; - repeated string remove_topics = 3; - repeated string overwrite_topics = 4; + // Topics to add. If already present, no-op for that topic. + repeated string add_topics = 2; + // Topics to remove. If not present, no-op for that topic. + repeated string remove_topics = 3; + // If provided, replaces all topics (add/remove are ignored). If empty, clears all topics. + repeated string overwrite_topics = 4; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.gois excluded by!**/gen/**api-spec/protobuf/gen/ark/v1/service.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/service.pb.rgw.gois excluded by!**/gen/**api-spec/protobuf/gen/ark/v1/service_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/types.pb.gois excluded by!**/*.pb.go,!**/gen/**
📒 Files selected for processing (8)
README.md(1 hunks)api-spec/openapi/swagger/ark/v1/service.openapi.json(4 hunks)api-spec/openapi/swagger/ark/v1/types.openapi.json(1 hunks)api-spec/protobuf/ark/v1/service.proto(2 hunks)api-spec/protobuf/ark/v1/types.proto(1 hunks)internal/interface/grpc/handlers/arkservice.go(2 hunks)internal/interface/grpc/handlers/broker.go(1 hunks)internal/interface/grpc/permissions/permissions.go(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-08T08:12:00.266Z
Learnt from: altafan
Repo: arkade-os/arkd PR: 659
File: .github/workflows/release.yaml:183-201
Timestamp: 2025-07-08T08:12:00.266Z
Learning: The arkd wallet Dockerfile in the arkade-os/arkd repository is named `arkdwallet.Dockerfile` (without hyphens), not `arkd-wallet.Dockerfile`.
Applied to files:
README.md
🧬 Code graph analysis (2)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (11)
GetEventStreamResponse(827-845)GetEventStreamResponse(858-858)GetEventStreamResponse(873-875)GetEventStreamResponse_StreamStarted(1027-1029)GetEventStreamResponse_StreamStarted(1051-1051)UpdateStreamTopicsRequest(1053-1061)UpdateStreamTopicsRequest(1074-1074)UpdateStreamTopicsRequest(1089-1091)UpdateStreamTopicsResponse(1121-1128)UpdateStreamTopicsResponse(1141-1141)UpdateStreamTopicsResponse(1156-1158)api-spec/protobuf/gen/ark/v1/types.pb.go (3)
StreamStartedEvent(1393-1398)StreamStartedEvent(1411-1411)StreamStartedEvent(1426-1428)
internal/interface/grpc/permissions/permissions.go (1)
api-spec/protobuf/gen/ark/v1/service_grpc.pb.go (1)
ArkService_ServiceDesc(616-678)
🔇 Additional comments (9)
README.md (1)
125-125: Doc wording fix looks goodThe clarification that
arkd-walletis used as a liquidity provider reads correctly; no further changes needed here.internal/interface/grpc/permissions/permissions.go (1)
159-162:UpdateStreamTopicswhitelisting asark:readis consistentMapping
/ArkService/UpdateStreamTopicstoEntityArkwithAction: "read"aligns with other streaming/read-style RPCs likeGetEventStreamandGetTransactionsStream. This keeps topic updates available to clients that already have read access without granting stronger wallet/manager permissions.api-spec/protobuf/ark/v1/types.proto (1)
148-152:StreamStartedEventproto definition is straightforward and consistentSingle
string id = 1;fits the use as a stream identifier and matches the generatedStreamStartedEventtype referenced byGetEventStreamResponse_StreamStarted.api-spec/openapi/swagger/ark/v1/types.openapi.json (1)
213-221: OpenAPIStreamStartedEventmirrors proto as expectedThe
StreamStartedEventschema (id: string) matches the protobuf definition and the usage inGetEventStreamResponse.streamStarted, so the type surface is consistent across specs.internal/interface/grpc/handlers/broker.go (1)
147-162:overwriteTopicsimplementation looks correct and matches existing helpersThe method:
- Guards access with the broker lock.
- Validates the listener id with a clear error.
- Rebuilds a fresh topics map using
formatTopic, ensuring old topics are fully replaced.This is consistent with
addTopics/removeTopicsbehavior and is a good primitive for the overwrite branch inUpdateStreamTopics.internal/interface/grpc/handlers/arkservice.go (1)
222-232: ImmediateStreamStartedEventon subscription is well-integratedEmitting a
StreamStartedEventright after registering the listener ensures clients always receive the stream id they need forUpdateStreamTopicsbefore any other events or heartbeats. The use of a UUID forlistener.idand directstream.Sendbefore entering the main loop is clean and matches the new API surface.api-spec/protobuf/ark/v1/service.proto (3)
97-105: RPC definition is well-structured and properly configured for HTTP gateway.The new UpdateStreamTopics RPC follows the existing service patterns with clear HTTP mapping and appropriate request/response types.
235-235: StreamStartedEvent field addition is backward compatible.Adding the new oneof member with field number 11 (after heartbeat at 10) maintains proper sequencing and doesn't break existing clients that ignore unknown fields.
239-250: Verify that proto3 validation is enforced in the implementation for required fields.Proto3 has no explicit
requiredkeyword;stream_idwill have default zero value if omitted. Ensure the gRPC handler in arkservice.go validates thatstream_idis non-empty before processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/interface/grpc/handlers/arkservice.go (2)
212-233: Closing listener channels while other goroutines may still send can panicIn
GetEventStream, the listener channel is closed via adefer close(listener.ch)while other goroutines (listenToEvents) can still be doingl.ch <- ev.event. That can lead tosend on closed channelpanics, regardless of defer ordering, because senders hold references tolisteneroutside the broker map.Given the fan-out model, it’s safer to avoid closing
listener.chat all from here, and instead rely on removing the listener from the broker so that no new senders are created. Otherwise you’d need a more explicit shutdown protocol that guarantees no goroutine can still send before closing the channel.Same concern applies conceptually to the transactions stream.
556-569: Unsafe concurrent access to listeners maps (potentialconcurrent map read and map write)Both
listenToEventsandlistenToTxEventsiterate directly overh.eventsListenerHandler.listeners/h.transactionsListenerHandler.listenerswithout taking the broker’s RWMutex, while other goroutines mutate these maps viapushListener,removeListener, and timeout handlers. This is a classic source ofconcurrent map read and map writepanics under load.Given you already have
broker.getListenersCopy()guarded by locks, a safer pattern would be:
- Take a snapshot with
getListenersCopy()underRLock.- Iterate over the snapshot map in these loops, spawning goroutines from the snapshot values.
That avoids unsynchronized map reads while preserving current behavior.
Also applies to: 595-603
♻️ Duplicate comments (1)
internal/interface/grpc/handlers/arkservice.go (1)
273-318: UpdateStreamTopics semantics look correct; consider mapping missing-stream errors to NotFoundThe handler’s behavior matches the API description: it validates
stream_id, gives overwrite precedence, supports combined add/remove viafallthrough, and returnsAllTopicsplus the requested additions/removals. That part looks good.However, when the broker returns
fmt.Errorf("subscription %s not found", id), this bubbles up as a generic error and will surface as gRPCUnknown. It would be more client-friendly to translate this intostatus.Error(codes.NotFound, ...)(or equivalent) so callers can distinguish “bad stream id” from internal failures.
🧹 Nitpick comments (4)
internal/interface/grpc/handlers/broker_test.go (1)
176-206: Good coverage for overwriteTopics semanticsThe test exercises the happy path, overwrite-to-empty, and non-existent subscription error, matching the broker’s behavior. This is sufficient functional coverage; adding a case-insensitivity/duplicate-topics case would be a nice-to-have but isn’t required.
internal/interface/grpc/handlers/broker.go (3)
16-29: Per-listener lock is a good fix; consider using a value RWMutexAdding
lock *sync.RWMutextolistenerand initializing it innewListenercorrectly sets up per-listener synchronization and fixes the previousmap[string]struct{}race ontopics.For future cleanliness, you might consider embedding a value
sync.RWMutexinstead of a pointer to avoid heap allocation and nil checks:-type listener[T any] struct { - id string - topics map[string]struct{} - ch chan T - timeoutTimer *time.Timer - lock *sync.RWMutex -} +type listener[T any] struct { + id string + topics map[string]struct{} + ch chan T + timeoutTimer *time.Timer + mu sync.RWMutex +}and update call sites accordingly. Not critical, just a style/perf nicety.
32-35: Use RLock/RUnlock in includesAny for read-only access
includesAnyonly readslistener.topics, so taking an exclusiveLockis unnecessary and slightly increases contention. A read lock is sufficient:-func (l *listener[T]) includesAny(topics []string) bool { - l.lock.Lock() - defer l.lock.Unlock() +func (l *listener[T]) includesAny(topics []string) bool { + l.lock.RLock() + defer l.lock.RUnlock()Same reasoning applies if you keep a value
sync.RWMutexinstead of a pointer.
95-105: Topic access now correctly synchronized; minor cleanups possibleThe new locking strategy around topic helpers looks sound:
getTopicsholds the broker’sRLockand then the listener’s lock while building the topics slice, avoiding races on the topics map.addTopics,removeTopics,removeAllTopics, andoverwriteTopicshold the broker’s write lock, then the per-listener lock, and mutatelistener.topicsonly under both, which is consistent and avoids deadlocks (no code takes the listener lock before the broker lock).overwriteTopicsatomically replaces the topics map with a freshly formatted one, which matches the intended semantics.Two small optional polish points:
- In each helper you can avoid double lookups by doing
listener, ok := h.listeners[id]once and reusinglistener.- In
getTopicsyou can useRLock/RUnlockon the listener lock since it’s a read-only operation, similar toincludesAny.These are minor and not correctness issues.
Also applies to: 113-184
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
api-spec/protobuf/ark/v1/service.proto(3 hunks)internal/interface/grpc/handlers/arkservice.go(2 hunks)internal/interface/grpc/handlers/broker.go(6 hunks)internal/interface/grpc/handlers/broker_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api-spec/protobuf/ark/v1/service.proto
🧰 Additional context used
🧬 Code graph analysis (2)
internal/interface/grpc/handlers/broker_test.go (1)
pkg/errors/errors.go (1)
Error(39-46)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (11)
GetEventStreamResponse(827-845)GetEventStreamResponse(858-858)GetEventStreamResponse(873-875)GetEventStreamResponse_StreamStarted(1027-1029)GetEventStreamResponse_StreamStarted(1051-1051)UpdateStreamTopicsRequest(1053-1061)UpdateStreamTopicsRequest(1074-1074)UpdateStreamTopicsRequest(1089-1091)UpdateStreamTopicsResponse(1121-1128)UpdateStreamTopicsResponse(1141-1141)UpdateStreamTopicsResponse(1156-1158)api-spec/protobuf/gen/ark/v1/types.pb.go (3)
StreamStartedEvent(1393-1398)StreamStartedEvent(1411-1411)StreamStartedEvent(1426-1428)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/interface/grpc/handlers/broker.go (1)
149-163: Consider refactoring to reduce duplication.The implementation is correct and maintains consistent lock ordering. However,
removeAllTopicsis functionally equivalent tooverwriteTopics(id, []string{}). Consider refactoring to reduce code duplication.Optional refactor to eliminate duplication:
func (h *broker[T]) removeAllTopics(id string) error { - h.lock.Lock() - defer h.lock.Unlock() - - listener, ok := h.listeners[id] - if !ok { - return fmt.Errorf("subscription %s not found", id) - } - - listener.lock.Lock() - defer listener.lock.Unlock() - - listener.topics = make(map[string]struct{}) - return nil + return h.overwriteTopics(id, []string{}) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/interface/grpc/handlers/broker.go(6 hunks)
🔇 Additional comments (6)
internal/interface/grpc/handlers/broker.go (6)
16-16: LGTM! Per-listener synchronization added.The addition of a per-listener RWMutex enables safe concurrent topic updates, which is essential for the new UpdateStreamTopics RPC functionality.
19-30: LGTM! Lock properly initialized.The lock is correctly initialized in the constructor, ensuring it's ready before the listener is used.
32-46: LGTM! Appropriate use of read lock.The use of RLock for the read-only topic check is correct and allows multiple concurrent readers while preventing race conditions.
131-147: LGTM! Consistent lock ordering maintained.The implementation follows the same lock ordering pattern as
addTopics(broker → listener), which is essential for deadlock prevention.
165-184: LGTM! Clean implementation of topic overwrite.The new
overwriteTopicsmethod correctly:
- Maintains consistent lock ordering (broker → listener)
- Creates a new map instead of modifying the existing one
- Formats topics consistently with other methods
- Handles non-existent listeners appropriately
113-129: Lock ordering is consistent and correct throughout the codebase.Verification confirms that all methods in the broker handler—including the
addTopicsmethod under review—consistently follow the correct lock ordering pattern: the broker lock (h.lock) is always acquired before the listener lock (listener.lock). This holds acrossgetTopics,addTopics,removeTopics,removeAllTopics, andoverwriteTopics, with appropriate use of read-locks where applicable. The implementation prevents deadlocks as intended.
| string stream_id = 1; | ||
| repeated string add_topics = 2; | ||
| repeated string remove_topics = 3; | ||
| repeated string overwrite_topics = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use oneOf here to make it clearer by definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I like that, will add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal/interface/grpc/handlers/arkservice.go (2)
286-296: Consider using codes.NotFound for subscription errors.When the broker returns "subscription %s not found", the error currently becomes codes.Internal. Consider wrapping this as
status.Error(codes.NotFound, ...)to help clients distinguish "invalid stream_id" from internal failures.Apply this pattern to wrap broker errors:
if err := h.eventsListenerHandler.overwriteTopics( req.GetStreamId(), req.GetTopicsChange().(*arkv1.UpdateStreamTopicsRequest_Overwrite).Overwrite.Topics, ); err != nil { - return nil, status.Errorf(codes.Internal, "overwrite topics error: %s", err.Error()) + if strings.Contains(err.Error(), "not found") { + return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error()) + } + return nil, status.Errorf(codes.Internal, "overwrite topics error: %s", err.Error()) }
301-312: Consider using codes.NotFound for subscription errors.Similar to the overwrite case, when add/remove operations fail with "subscription not found", wrap the error as
codes.NotFoundinstead ofcodes.Internalto enable proper client error handling.Apply this pattern:
if len(modify.AddTopics) > 0 { if err := h.eventsListenerHandler.addTopics( req.GetStreamId(), modify.AddTopics, ); err != nil { - return nil, status.Errorf(codes.Internal, "add topics error: %s", err.Error()) + if strings.Contains(err.Error(), "not found") { + return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error()) + } + return nil, status.Errorf(codes.Internal, "add topics error: %s", err.Error()) } } if len(modify.RemoveTopics) > 0 { if err := h.eventsListenerHandler.removeTopics( req.GetStreamId(), modify.RemoveTopics, ); err != nil { - return nil, status.Errorf(codes.Internal, "remove topics error: %s", err.Error()) + if strings.Contains(err.Error(), "not found") { + return nil, status.Errorf(codes.NotFound, "stream not found: %s", err.Error()) + } + return nil, status.Errorf(codes.Internal, "remove topics error: %s", err.Error()) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (3)
api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.gois excluded by!**/gen/**api-spec/protobuf/gen/ark/v1/service.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/service_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**
📒 Files selected for processing (5)
api-spec/openapi/swagger/ark/v1/service.openapi.json(7 hunks)api-spec/protobuf/ark/v1/service.proto(3 hunks)internal/interface/grpc/handlers/arkservice.go(2 hunks)internal/interface/grpc/handlers/broker.go(4 hunks)internal/interface/grpc/permissions/permissions.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/interface/grpc/permissions/permissions.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/broker.go (1)
api-spec/protobuf/gen/ark/v1/service.pb.go (3)
OverwriteTopics(1105-1110)OverwriteTopics(1123-1123)OverwriteTopics(1138-1140)
🔇 Additional comments (12)
internal/interface/grpc/handlers/arkservice.go (2)
222-232: LGTM! StreamStartedEvent emission is correct.The immediate emission of StreamStartedEvent with the listener ID enables clients to subsequently call UpdateStreamTopics, which is the intended design.
273-279: LGTM! Input validation is appropriate.The stream_id validation ensures the required field is present before processing.
internal/interface/grpc/handlers/broker.go (4)
11-30: LGTM! Per-listener locking correctly initialized.The addition of
sync.RWMutexto the listener struct and its initialization innewListenerestablishes the foundation for thread-safe topic management.
32-46: LGTM! Read lock correctly protects topic reads.The use of
RLockinincludesAnyproperly guards concurrent reads of thetopicsmap while allowing multiple concurrent readers.
48-95: LGTM! Topic management methods correctly synchronized.The new methods follow proper locking discipline:
AddTopics,RemoveTopics,OverwriteTopics: Use write locks for mutationsGetTopics,Channel: Use read locks for non-mutating accessThe nil-checks in
AddTopicsandRemoveTopicsprovide defensive safety.
133-195: LGTM! Broker methods correctly delegate to listener methods.The refactored broker methods follow a consistent pattern:
- Acquire broker-level
RLockto safely read from thelistenersmap- Retrieve the listener
- Release broker-level lock
- Delegate to listener methods (which handle their own synchronization)
This minimizes lock contention while ensuring thread safety.
api-spec/openapi/swagger/ark/v1/service.openapi.json (3)
95-95: LGTM! Documentation accurately reflects StreamStartedEvent behavior.The updated description correctly documents that StreamStartedEvent is emitted immediately and provides the stream ID for UpdateStreamTopics calls.
299-339: LGTM! New UpdateStreamTopics endpoint properly defined.The endpoint definition correctly specifies:
- POST method at /v1/batch/updateTopics
- Request/response body schemas
- Operation ID for code generation
695-697: LGTM! Schema definitions align with protobuf spec.The new schemas correctly represent:
StreamStartedEventwithidfieldModifyTopicswithaddTopicsandremoveTopicsarraysOverwriteTopicswithtopicsarrayUpdateStreamTopicsRequestwithstreamId,modify, andoverwritefieldsUpdateStreamTopicsResponsewithtopicsAdded,topicsRemoved, andallTopicsfieldsThe OpenAPI representation of the protobuf oneof as optional fields is appropriate.
Also applies to: 872-889, 903-914, 996-1004, 1261-1300
api-spec/protobuf/ark/v1/service.proto (3)
86-88: LGTM! Documentation accurately describes StreamStartedEvent behavior.The updated comment correctly explains that StreamStartedEvent is immediately emitted with the stream ID for subsequent UpdateStreamTopics calls.
99-107: LGTM! UpdateStreamTopics RPC properly defined.The RPC definition correctly:
- Accepts UpdateStreamTopicsRequest and returns UpdateStreamTopicsResponse
- Maps to POST /v1/batch/updateTopics via HTTP annotation
- Includes body: "*" for JSON request body handling
237-267: LGTM! Message definitions correctly use oneof for mutually exclusive options.The protobuf definitions are well-structured:
StreamStartedEventadded toGetEventStreamResponseoneof with field number 11ModifyTopicsandOverwriteTopicsprovide distinct topic management strategiesUpdateStreamTopicsRequestuses oneoftopics_changeto model the mutually exclusive modify/overwrite choice- Comment clearly documents precedence rules
UpdateStreamTopicsResponseincludes all necessary fields to reflect changesThe use of oneof addresses the past review feedback and provides type-safe modeling of the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/interface/grpc/handlers/arkservice.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (15)
GetEventStreamResponse(827-845)GetEventStreamResponse(858-858)GetEventStreamResponse(873-875)GetEventStreamResponse_StreamStarted(1027-1029)GetEventStreamResponse_StreamStarted(1051-1051)UpdateStreamTopicsRequest(1153-1163)UpdateStreamTopicsRequest(1176-1176)UpdateStreamTopicsRequest(1191-1193)UpdateStreamTopicsResponse(1243-1250)UpdateStreamTopicsResponse(1263-1263)UpdateStreamTopicsResponse(1278-1280)UpdateStreamTopicsRequest_Overwrite(1235-1237)UpdateStreamTopicsRequest_Overwrite(1241-1241)UpdateStreamTopicsRequest_Modify(1231-1233)UpdateStreamTopicsRequest_Modify(1239-1239)api-spec/protobuf/gen/ark/v1/types.pb.go (3)
StreamStartedEvent(1393-1398)StreamStartedEvent(1411-1411)StreamStartedEvent(1426-1428)
🔇 Additional comments (1)
internal/interface/grpc/handlers/arkservice.go (1)
222-232: LGTM - StreamStartedEvent emission implemented correctly.The immediate emission of
StreamStartedEventwith the listener ID provides the necessary stream identifier for clients to callUpdateStreamTopics. Error handling is appropriate.
| l.topics = newTopics | ||
| } | ||
|
|
||
| func (l *listener[T]) GetTopics() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems to not be needed, we can keep using the existing getTopics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per this comment I made the change: #829 (comment)
This idea of breaking these listener functions out was so that we use the specific listener's lock.
Should I still revert back?
| } | ||
| } | ||
|
|
||
| func (l *listener[T]) OverwriteTopics(topics []string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to be exported, it is only used within the package
| func (l *listener[T]) RemoveTopics(topics []string) { | ||
| l.lock.Lock() | ||
| defer l.lock.Unlock() | ||
| if l.topics == nil { | ||
| return | ||
| } | ||
| for _, topic := range topics { | ||
| delete(l.topics, formatTopic(topic)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to GetTopics change, it seems not needed. can't we use the existing removeTopics ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per this comment I made the change: #829 (comment)
This idea of breaking these listener functions out was so that we use the specific listener's lock.
Should I still revert back?
| func (l *listener[T]) AddTopics(topics []string) { | ||
| l.lock.Lock() | ||
| defer l.lock.Unlock() | ||
| if l.topics == nil { | ||
| l.topics = make(map[string]struct{}, len(topics)) | ||
| } | ||
| for _, topic := range topics { | ||
| l.topics[formatTopic(topic)] = struct{}{} | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per this comment I made the change: #829 (comment)
This idea of breaking these listener functions out was so that we use the specific listener's lock.
Should I still revert back?
| func (l *listener[T]) Channel() chan T { | ||
| // no strong reason to lock here, but keep RLock to be safe if ch could be replaced | ||
| l.lock.RLock() | ||
| defer l.lock.RUnlock() | ||
| return l.ch | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be replaced
in theory, it is never replaced, so I'd remove this method and keep accessing via l.ch, except if I'm missing something ? In any case, we shouldn't export the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per this comment I made the change: #829 (comment)
This idea of breaking these listener functions out was so that we use the specific listener's lock.
Should I still revert back?
in the meantime I fix, so that we do not export the method
| func (h *broker[T]) removeAllTopics(id string) error { | ||
| h.lock.Lock() | ||
| defer h.lock.Unlock() | ||
|
|
||
| if _, ok := h.listeners[id]; !ok { | ||
| h.lock.RLock() | ||
| listener, ok := h.listeners[id] | ||
| h.lock.RUnlock() | ||
| if !ok { | ||
| return fmt.Errorf("subscription %s not found", id) | ||
| } | ||
| listener.OverwriteTopics([]string{}) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason to change removeAllTopics ? I see the lock change, but then the lock is acquired again by OverwriteTopics. it seems to me we can revert that change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do the behavior of removing all the topics by calling OverwriteTopics with an empty array of topics. Calling OverwriteTopics uses the listener's locks. The old way of setting the topics directly in removeAllTopics did not use the listener's locks.
|
|
||
| oneof topics_change { | ||
| ModifyTopics modify = 2; | ||
| OverwriteTopics overwrite = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the overwrite feature, but do we have real needs for it @Kukks ? If not a common use case, maybe worth to let client handles it (by removing current topics and adding new ones) so we simplify the proto spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal/interface/grpc/handlers/broker.go (2)
11-29: Prefersync.RWMutexby value to make zero-value listeners safeUsing
lock *sync.RWMutex(initialized innewListener) means any listener created via a struct literal without settinglockwill panic on firstLock/RLock. A value field (lock sync.RWMutex) is idiomatic and makes the zero value usable, while keeping call sites the same (l.lock.Lock()still works via implicit address-taking).Consider:
- lock *sync.RWMutex + lock sync.RWMutexand in
newListeneryou can omit explicit initialization oflock(or leave it, both are fine).This reduces the chance of subtle panics if tests or future code construct
listenerdirectly.
90-95:channel()accessor is defensively locked but acceptableLocking around access to
l.chis conservative given the channel is never reassigned today, but the cost is negligible and it future‑proofs against potential channel replacement. The method is unexported, so the surface area is contained.I’d leave as is unless profiling ever shows this on a hot path.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/interface/grpc/handlers/broker.go(4 hunks)
🔇 Additional comments (2)
internal/interface/grpc/handlers/broker.go (2)
32-88: Per-listener topic locking and helpers look correctThe switch to
RLock/RUnlockinincludesAnyand the new helpers (addTopics,removeTopics,overwriteTopics,getTopics) correctly centralize synchronization onlistener.topicsand keep all map access under the same mutex. The nil-guard inaddTopics/removeTopicsand the copy ingetTopicsavoid races on the internal map.No issues from a correctness or concurrency standpoint.
133-195: Broker refactor to delegate to listener locks is soundThe pattern of:
- Taking
h.lock.RLock()only to look up the listener,- Releasing the broker lock before calling
listener.channel(),getTopics,addTopics,removeTopics, oroverwriteTopics,- Guarding all per-listener mutations with
listener.lock,is a good separation of concerns and avoids holding the global broker lock while doing per-listener work.
The small TOCTOU window (listener removed after lookup but before the listener method call) is benign since operations would then only affect an unreachable listener instance.
No changes needed here.
ce3c56e to
cf60d6a
Compare
|
this says I force pushed, I used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/interface/grpc/handlers/broker.go (1)
90-95: Remove unnecessary RLock in channel() accessor.The comment acknowledges there's "no strong reason to lock here" since the channel is never replaced after initialization. The RLock adds unnecessary overhead to every event send operation. Since
chis immutable afternewListenerreturns, no synchronization is needed.Apply this diff:
func (l *listener[T]) channel() chan T { - // no strong reason to lock here, but keep RLock to be safe if ch could be replaced - l.lock.RLock() - defer l.lock.RUnlock() return l.ch }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
api-spec/protobuf/gen/ark/v1/service.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/service.pb.rgw.gois excluded by!**/gen/**api-spec/protobuf/gen/ark/v1/service_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/types.pb.gois excluded by!**/*.pb.go,!**/gen/**
📒 Files selected for processing (9)
README.md(1 hunks)api-spec/openapi/swagger/ark/v1/service.openapi.json(7 hunks)api-spec/openapi/swagger/ark/v1/types.openapi.json(1 hunks)api-spec/protobuf/ark/v1/service.proto(3 hunks)api-spec/protobuf/ark/v1/types.proto(1 hunks)internal/interface/grpc/handlers/arkservice.go(2 hunks)internal/interface/grpc/handlers/broker.go(4 hunks)internal/interface/grpc/handlers/broker_test.go(1 hunks)internal/interface/grpc/permissions/permissions.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api-spec/protobuf/ark/v1/types.proto
- README.md
- api-spec/openapi/swagger/ark/v1/types.openapi.json
- internal/interface/grpc/permissions/permissions.go
- internal/interface/grpc/handlers/broker_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/interface/grpc/handlers/arkservice.go (2)
api-spec/protobuf/gen/ark/v1/service.pb.go (15)
GetEventStreamResponse(827-845)GetEventStreamResponse(858-858)GetEventStreamResponse(873-875)GetEventStreamResponse_StreamStarted(1027-1029)GetEventStreamResponse_StreamStarted(1051-1051)UpdateStreamTopicsRequest(1153-1163)UpdateStreamTopicsRequest(1176-1176)UpdateStreamTopicsRequest(1191-1193)UpdateStreamTopicsResponse(1243-1250)UpdateStreamTopicsResponse(1263-1263)UpdateStreamTopicsResponse(1278-1280)UpdateStreamTopicsRequest_Overwrite(1235-1237)UpdateStreamTopicsRequest_Overwrite(1241-1241)UpdateStreamTopicsRequest_Modify(1231-1233)UpdateStreamTopicsRequest_Modify(1239-1239)api-spec/protobuf/gen/ark/v1/types.pb.go (3)
StreamStartedEvent(1393-1398)StreamStartedEvent(1411-1411)StreamStartedEvent(1426-1428)
🔇 Additional comments (10)
api-spec/openapi/swagger/ark/v1/service.openapi.json (1)
95-95: LGTM! OpenAPI spec correctly reflects the protobuf definitions.The OpenAPI specification properly defines the new UpdateStreamTopics endpoint and related schemas. The additions are consistent with the protobuf definitions and follow OpenAPI 3.1.0 conventions.
Also applies to: 299-339, 695-697, 872-914, 996-1004, 1261-1300
internal/interface/grpc/handlers/arkservice.go (2)
222-232: LGTM! StreamStartedEvent emission is correctly implemented.The immediate emission of StreamStartedEvent upon stream creation is well-placed. It sends the listener ID to the client before any other events, enabling subsequent UpdateStreamTopics calls. Error handling is appropriate—if the send fails, the error is returned and the stream is properly cleaned up by the deferred calls.
273-327: LGTM! UpdateStreamTopics implementation is solid.The handler correctly implements the update semantics:
- Stream ID validation ensures the required field is present
- Type switch safely handles the oneof variants without panic risk
- Overwrite takes precedence over modify, as documented
- Modify case allows simultaneous add/remove operations
- Error codes are appropriate: InvalidArgument for bad input, NotFound for missing stream
- Response structure matches the proto definition
The past review concerns about concurrency and type assertions have been properly addressed.
internal/interface/grpc/handlers/broker.go (4)
16-16: LGTM! Per-listener locking correctly addresses the concurrency issue.The addition of a per-listener RWMutex properly resolves the data race flagged in previous reviews. This allows concurrent topic updates (via UpdateStreamTopics) and topic reads (via includesAny during event forwarding) to be safely synchronized without blocking unrelated listeners.
Also applies to: 28-28
32-46: LGTM! Read lock properly protects topic access.Using RLock for the read-only includesAny operation is correct and allows multiple concurrent readers, which is important for event forwarding performance when many listeners are checking topics simultaneously.
48-88: LGTM! Thread-safe topic management methods are well-implemented.The listener methods properly encapsulate topic map access with appropriate locking:
- addTopics/removeTopics/overwriteTopics use Lock for write operations
- getTopics uses RLock for read-only access
- Nil map checks prevent panics
- Topic formatting is consistently applied
This design cleanly separates listener-level synchronization from broker-level synchronization, reducing lock contention.
133-195: LGTM! Broker methods correctly delegate to listener accessors.The broker methods follow a good pattern:
- Acquire broker RLock to safely read the listeners map
- Get the listener reference
- Release broker lock immediately (avoiding nested lock holding)
- Call listener methods that acquire their own locks
This approach minimizes lock contention and prevents potential deadlocks. The "subscription not found" errors provide clear feedback for invalid stream IDs.
api-spec/protobuf/ark/v1/service.proto (3)
86-88: LGTM! Documentation correctly describes StreamStartedEvent behavior.The updated comment accurately explains that StreamStartedEvent is immediately sent upon stream creation and provides the stream ID for subsequent UpdateStreamTopics calls. This helps API consumers understand the flow.
99-107: LGTM! UpdateStreamTopics RPC definition is clean and well-documented.The RPC definition follows protobuf conventions with appropriate HTTP mapping to POST /v1/batch/updateTopics. The comment clearly explains the functionality.
237-267: LGTM! Message definitions are well-structured.The protobuf message definitions are excellent:
- StreamStartedEvent properly added to GetEventStreamResponse oneof
- ModifyTopics and OverwriteTopics provide clear separation of update semantics
- oneof topics_change correctly models the mutually exclusive update modes
- Comment on lines 250-253 clearly documents precedence and requirements
- UpdateStreamTopicsResponse provides comprehensive feedback with added, removed, and all topics
The use of oneof for topics_change is a good design choice that enforces the either-modify-or-overwrite constraint at the protocol level.
Issue: #728
Adds new rpc
UpdateStreamTopicsallowing client to update topics on their rpc stream fromGetEventStream. Offers ability to add or remove topics at the same time, or to overwrite all topics at once by specifying a list of topics.A new event type
StreamStartedEventwas added that gets sent immediately on the creation of a stream fromGetEventStreamand returns the id of the listener. This id is then used by the client when callingUpdateStreamTopics.Changes to the
go-sdkrepo in the directory/go-sdk/blob/master/clientwill need to be made to support calling this new rpc. I do not see an appropriate place in this arkd repo to call this new rpc without it also being supported in thego-sdkrepo. It appears thee2e_test.gowould be the appropriate place to add the test for this new call once supported in thego-sdk.Summary by CodeRabbit
New Features
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.