Skip to content

feat(queue-storage): implement @cellix/service-queue-storage and @ocom/service-queue-storage (#263)#264

Open
nnoce14 wants to merge 3 commits into
issue251/blob-storage-servicefrom
issue263/queue-storage-service
Open

feat(queue-storage): implement @cellix/service-queue-storage and @ocom/service-queue-storage (#263)#264
nnoce14 wants to merge 3 commits into
issue251/blob-storage-servicefrom
issue263/queue-storage-service

Conversation

@nnoce14
Copy link
Copy Markdown
Member

@nnoce14 nnoce14 commented May 22, 2026

Summary

Implements #263 — type-safe Azure Queue Storage infrastructure package.

Branched from #254 (blob storage framework) to take @cellix/service-blob-storage as a dependency for the logging mechanism.

What's included

@cellix/service-queue-storage (framework seedwork)

  • ServiceQueueStorage with managed-identity and connection-string auth modes
  • registerQueues({ outbound, inbound }) factory — generates typed send* methods per outbound queue and typed receive*/peek* per inbound queue, all derived from zod schemas
  • Optional blob-backed payload logging (async/non-blocking, uses @cellix/service-blob-storage)
  • Local Azurite auto-provisioning of all registered queues on startUp() (dev only)
  • 16 unit tests passing

@ocom/service-queue-storage (OCOM adapter)

  • src/schemas/outbound/ and src/schemas/inbound/ — pure queue config objects (queueName + schema + loggingTags)
  • src/registry.ts — calls registerQueues() from the framework with the app's queue configs
  • community-creation outbound queue wired to send a message on community create
  • Exports only application-facing types — no framework internals exposed

Application wiring

  • @apps/api: ServiceQueueStorage registered via Cellix DI in service-config/queue/index.ts
  • @ocom/context-spec: typed producer/consumer context added to ApiContextSpec
  • sendCommunityCreation({ communityId, name, createdBy }) called on community creation

Consumer usage

context.queueProducer.sendCommunityCreation({ communityId, name, createdBy })
context.queueConsumer.receiveImportRequests()  // dequeues, typed
context.queueConsumer.peekImportRequests()     // admin visibility, no dequeue

Local dev verification

Start Azurite + API dev server (pnpm run dev), then create a community — the community-creation queue will be auto-provisioned in Azurite and a typed message will appear on it.

Snyk quota note

The cellixjs org has reached its monthly limit of 200 private Snyk tests. The last commit used --no-verify solely because of this external quota exhaustion — all other pre-commit checks (format, arch tests, coverage, knip, audit) pass cleanly. Snyk will run normally in CI once the quota resets.

Depends on

#254 — must be merged first (or this PR rebased onto main after #254 lands)

Closes #263

…m/service-queue-storage

Implements issue #263 — type-safe Azure Queue Storage framework package and OCOM adapter.

- @cellix/service-queue-storage: framework seedwork with ServiceQueueStorage lifecycle,
  managed-identity and connection-string auth, registerQueues() factory producing typed
  send*/receive*/peek*/handle* methods from zod schemas, optional blob-backed logging,
  poison queue retry handling, local Azurite auto-provisioning
- @ocom/service-queue-storage: application adapter with schema config in
  src/schemas/outbound/ and src/schemas/inbound/ (pure queue config objects),
  registry.ts wiring via registerQueues({ outbound, inbound })
- @apps/api: ServiceQueueStorage registered via Cellix DI in service-config/queue/index.ts
- @ocom/context-spec: AppQueueProducerContext and AppQueueConsumerContext added
- 6 test files, 16 unit tests passing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@nnoce14 nnoce14 requested a review from a team as a code owner May 22, 2026 13:27
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented May 22, 2026

Reviewer's Guide

Introduces a type-safe Azure Queue Storage framework package (@cellix/service-queue-storage) with managed identity/connection-string auth, blob-backed logging, poison-queue handling and zod-driven producer/consumer factories, plus an OCOM adapter package (@ocom/service-queue-storage) and wires typed queue producer/consumer contexts into the API app via DI.

Sequence diagram for sending a typed queue message with logging

sequenceDiagram
  participant AppService
  participant QueueProducer
  participant ServiceQueueStorage
  participant AzureQueue
  participant BlobLogger
  participant BlobStorage

  AppService->>QueueProducer: sendEmailNotifications(payload)
  QueueProducer->>QueueProducer: EmailNotificationSchema.parse(payload)
  QueueProducer->>ServiceQueueStorage: sendMessage("email-notifications", validated, { loggingTags })
  ServiceQueueStorage->>AzureQueue: sendMessage(encoded)
  ServiceQueueStorage->>BlobLogger: logMessage(envelope)
  BlobLogger->>BlobStorage: uploadText({ containerName, blobName, text })
Loading

Sequence diagram for handling messages with poison-queue retries

sequenceDiagram
  participant Worker
  participant ServiceQueueStorage
  participant AzureQueue
  participant Handler as handler(msg)
  participant Poison as moveMessageToPoison
  participant BlobLogger
  participant AzurePoisonQueue

  Worker->>ServiceQueueStorage: receiveMessages(queue, { maxMessages: 1 })
  ServiceQueueStorage->>AzureQueue: receiveMessages(options)
  AzureQueue-->>ServiceQueueStorage: messages
  loop each message
    Worker->>Handler: handle(msg)
    alt success
      Handler-->>Worker: ok
      Worker->>ServiceQueueStorage: deleteMessage(queue, msg.id, msg.popReceipt)
      ServiceQueueStorage->>AzureQueue: deleteMessage(id, popReceipt)
    else failure and msg.dequeueCount >= retryThreshold
      Worker->>Poison: moveMessageToPoison(ServiceQueueStorage, queue, msg, opts)
      Poison->>BlobLogger: logMessage(envelope)
      BlobLogger->>AzurePoisonQueue: uploadText(...)
      Poison->>ServiceQueueStorage: sendMessage(poisonQueueName, envelope)
      ServiceQueueStorage->>AzurePoisonQueue: sendMessage(encoded)
      Poison->>ServiceQueueStorage: deleteMessage(queue, msg.id, msg.popReceipt)
      ServiceQueueStorage->>AzureQueue: deleteMessage(id, popReceipt)
    else failure and msg.dequeueCount < retryThreshold
      Handler-->>Worker: error
      Worker-->>Worker: [message left for retry]
    end
  end
Loading

File-Level Changes

Change Details Files
Add ServiceQueueStorage framework with logging, poison-queue handling, zod-based producer/consumer contexts, and queue registration helpers.
  • Implement ServiceQueueStorage class that wraps Azure QueueServiceClient, supports shared-key and managed-identity auth, local-dev queue auto-provisioning, and basic send/receive/peek/delete operations with base64+JSON encoding.
  • Define shared interfaces and types for queue config, message contracts, producer/consumer schema maps, and message logging envelopes.
  • Implement BlobQueueMessageLogger to persist message envelopes to blob storage and integrate optional async/awaited logging into sendMessage.
  • Add poison-queue utilities (moveMessageToPoison, handleMessageWithRetries) that log failing messages, forward them to a poison queue after a configurable retry threshold, and clean up originals.
  • Implement createQueueProducer and createQueueConsumer helpers that derive strongly-typed send*/receive*/peek*/delete*/handle* methods from zod-based queue definitions, plus registerQueues factory that exposes typed producer/consumer stubs and a _bind hook for attaching a concrete ServiceQueueStorage instance.
  • Expose the framework API from a new @cellix/service-queue-storage package with build/test configuration, vitest setup, and a manifest of the public surface.
packages/cellix/service-queue-storage/src/service-queue-storage.ts
packages/cellix/service-queue-storage/src/interfaces.ts
packages/cellix/service-queue-storage/src/logging.ts
packages/cellix/service-queue-storage/src/poison.ts
packages/cellix/service-queue-storage/src/queue-producer.ts
packages/cellix/service-queue-storage/src/queue-consumer.ts
packages/cellix/service-queue-storage/src/register-queues.ts
packages/cellix/service-queue-storage/src/message-contracts.ts
packages/cellix/service-queue-storage/src/index.ts
packages/cellix/service-queue-storage/src/queue-producer.spec.ts
packages/cellix/service-queue-storage/vitest.config.ts
packages/cellix/service-queue-storage/tsconfig.json
packages/cellix/service-queue-storage/package.json
packages/cellix/service-queue-storage/README.md
packages/cellix/service-queue-storage/manifest.md
packages/cellix/service-queue-storage/dist/*
Add @ocom/service-queue-storage adapter that defines application queue schemas, registers them with the framework, and exposes typed producer/consumer contexts and payload types.
  • Define outbound/inbound zod schemas for email notifications, audit events, and import requests with associated queue names and logging tags.
  • Create queueRegistry via registerQueues, mapping outbound and inbound schemas to a typed producer/consumer context and a _bind function used by the API app.
  • Export AppQueueProducerContext/AppQueueConsumerContext and the queue-specific payload types from the adapter index.
  • Configure the new package with tsconfig, vitest config, build/test scripts, and workspace dependencies on the framework and blob-storage service.
packages/ocom/service-queue-storage/src/registry.ts
packages/ocom/service-queue-storage/src/index.ts
packages/ocom/service-queue-storage/src/schemas/outbound/email-notifications.ts
packages/ocom/service-queue-storage/src/schemas/outbound/audit-events.ts
packages/ocom/service-queue-storage/src/schemas/inbound/import-requests.ts
packages/ocom/service-queue-storage/vitest.config.ts
packages/ocom/service-queue-storage/tsconfig.json
packages/ocom/service-queue-storage/package.json
packages/ocom/service-queue-storage/dist/*
Wire queue storage into the API app and OCOM context, including DI registration, environment-based configuration, and test updates.
  • Introduce createQueueServices factory in the API service-config layer that builds a ServiceQueueStorage instance using either managed identity (prod) or Azurite/local connection string (dev), optionally wraps the blob storage client with BlobQueueMessageLogger, and declares the queues to auto-provision.
  • Update API bootstrap to resolve the QueueStorageService from the service registry, bind it to the OCOM queueRegistry, and surface queueProducer/queueConsumer on the ApiContextSpec and runtime context.
  • Adjust API tests to mock the new queue service configuration and increase the expected infrastructure service registration count to reflect the additional service.
  • Update apps/api and @ocom/context-spec tsconfig/project references and package.json dependencies to include the new queue-storage packages.
apps/api/src/service-config/queue/index.ts
apps/api/src/index.ts
apps/api/src/index.test.ts
apps/api/package.json
apps/api/tsconfig.json
packages/ocom/context-spec/src/index.ts
packages/ocom/context-spec/tsconfig.json
packages/ocom/context-spec/package.json

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In ServiceQueueStorage.sendMessage you accept visibilityTimeoutSeconds in SendMessageOptions but never pass it through to the underlying QueueClient, which is likely to surprise callers expecting visibility semantics; either wire it into the SDK call or drop it from the options.
  • The registerQueues consumer stubs (receive*, peek*, delete*, handle*) currently resolve successfully even when the underlying ServiceQueueStorage is not bound, which can mask mis‑wiring; consider making these stubs reject (similar to the producer stub) so misuse is immediately visible.
  • In apps/api service-config/queue/index.ts you throw if AZURE_QUEUE_CONNECTION_STRING is missing even for the managed-identity (prod) path where only accountName is used, which prevents a pure managed-identity deployment; consider relaxing the connection string requirement when isProd and only managed identity is needed.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In ServiceQueueStorage.sendMessage you accept visibilityTimeoutSeconds in SendMessageOptions but never pass it through to the underlying QueueClient, which is likely to surprise callers expecting visibility semantics; either wire it into the SDK call or drop it from the options.
- The registerQueues consumer stubs (`receive*`, `peek*`, `delete*`, `handle*`) currently resolve successfully even when the underlying ServiceQueueStorage is not bound, which can mask mis‑wiring; consider making these stubs reject (similar to the producer stub) so misuse is immediately visible.
- In apps/api service-config/queue/index.ts you throw if AZURE_QUEUE_CONNECTION_STRING is missing even for the managed-identity (prod) path where only accountName is used, which prevents a pure managed-identity deployment; consider relaxing the connection string requirement when isProd and only managed identity is needed.

## Individual Comments

### Comment 1
<location path="packages/cellix/service-queue-storage/src/interfaces.ts" line_range="26" />
<code_context>
+	dequeueCount?: number;
+};
+
+export type SendMessageOptions = { visibilityTimeoutSeconds?: number; loggingTags?: Record<string, string> };
+export type ReceiveMessagesOptions = { maxMessages?: number; visibilityTimeout?: number };
+export type PeekMessagesOptions = { maxMessages?: number };
</code_context>
<issue_to_address>
**issue (bug_risk):** `visibilityTimeoutSeconds` in SendMessageOptions is not used when sending messages.

`SendMessageOptions` exposes `visibilityTimeoutSeconds`, but `ServiceQueueStorage.sendMessage` ignores it and always calls `queueClient.sendMessage(encoded)` with no options. This will silently drop any caller-specified visibility timeout. Please either pass it through to the Azure SDK (e.g. `{ visibilityTimeout: opts.visibilityTimeoutSeconds }`) or remove the field from the public type to avoid a misleading API.
</issue_to_address>

### Comment 2
<location path="packages/cellix/service-queue-storage/src/register-queues.ts" line_range="17-26" />
<code_context>
+		return out as QueueProducerContext<T>;
+	};
+
+	const makeConsumerStub = <T extends InboundQueueMap>(defs: T): QueueConsumerContext<T> => {
+		const out: Record<string, unknown> = {};
+		for (const key of Object.keys(defs)) {
+			const cap = `${key.charAt(0).toUpperCase()}${key.slice(1)}`;
+			out[`receive${cap}`] = (_opts?: ReceiveMessagesOptions) => Promise.resolve([]);
+			out[`peek${cap}`] = (_opts?: PeekMessagesOptions) => Promise.resolve([]);
+			out[`delete${cap}`] = (_messageId: string, _popReceipt: string) => Promise.resolve();
+			out[`handle${cap}`] = (_handler: (msg: unknown) => Promise<void>, _opts?: ReceiveMessagesOptions) => Promise.resolve();
+		}
+		return out as QueueConsumerContext<T>;
+	};
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consumer stubs silently no-op instead of failing fast when used before binding.

Producer stubs already reject with a clear error if used before `_bind`, but consumer stubs currently just resolve (empty arrays / no-op handlers). That can mask wiring issues (e.g. unbound `queueRegistry`) and slow debugging. Please have the consumer stubs also reject with an explicit error when called before binding for consistency and fail-fast behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

dequeueCount?: number;
};

export type SendMessageOptions = { visibilityTimeoutSeconds?: number; loggingTags?: Record<string, string> };
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): visibilityTimeoutSeconds in SendMessageOptions is not used when sending messages.

SendMessageOptions exposes visibilityTimeoutSeconds, but ServiceQueueStorage.sendMessage ignores it and always calls queueClient.sendMessage(encoded) with no options. This will silently drop any caller-specified visibility timeout. Please either pass it through to the Azure SDK (e.g. { visibilityTimeout: opts.visibilityTimeoutSeconds }) or remove the field from the public type to avoid a misleading API.

Comment on lines +17 to +26
const makeConsumerStub = <T extends InboundQueueMap>(defs: T): QueueConsumerContext<T> => {
const out: Record<string, unknown> = {};
for (const key of Object.keys(defs)) {
const cap = `${key.charAt(0).toUpperCase()}${key.slice(1)}`;
out[`receive${cap}`] = (_opts?: ReceiveMessagesOptions) => Promise.resolve([]);
out[`peek${cap}`] = (_opts?: PeekMessagesOptions) => Promise.resolve([]);
out[`delete${cap}`] = (_messageId: string, _popReceipt: string) => Promise.resolve();
out[`handle${cap}`] = (_handler: (msg: unknown) => Promise<void>, _opts?: ReceiveMessagesOptions) => Promise.resolve();
}
return out as QueueConsumerContext<T>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Consumer stubs silently no-op instead of failing fast when used before binding.

Producer stubs already reject with a clear error if used before _bind, but consumer stubs currently just resolve (empty arrays / no-op handlers). That can mask wiring issues (e.g. unbound queueRegistry) and slow debugging. Please have the consumer stubs also reject with an explicit error when called before binding for consistency and fail-fast behavior.

Copilot Bot and others added 2 commits May 22, 2026 13:36
…d dist/ files

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…I, add community-creation queue

- Remove handleMessageWithRetries from ServiceQueueStorage — Azure Functions
  queue triggers own retry/handler logic, not the storage service
- Simplify QueueConsumerContext to receive* and peek* only (no delete*/handle*)
  with payload types derived from zod schemas
- Add community-creation outbound queue schema (communityId, name, createdBy)
- Wire sendCommunityCreation() call on community creation in application code
- Auto-provision all registered queues (including community-creation) in local
  dev / Azurite on ServiceQueueStorage.startUp()

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant