Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"@ocom/service-apollo-server": "workspace:*",
"@ocom/service-blob-storage": "workspace:*",
"@ocom/service-mongoose": "workspace:*",
"@cellix/service-queue-storage": "workspace:*",
"@ocom/service-queue-storage": "workspace:*",
"@ocom/service-otel": "workspace:*",
"@ocom/service-token-validation": "workspace:*",
"@opentelemetry/api": "1.9.0"
Expand Down
19 changes: 14 additions & 5 deletions apps/api/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,19 @@ vi.mock('./service-config/mongoose/index.ts', () => ({
mongooseContextBuilder: vi.fn(() => dataSourcesFactory),
}));
vi.mock('./service-config/blob-storage/index.ts', () => ({
blobStorageConfig: {
accountName: 'devstoreaccount1',
connectionString: 'UseDevelopmentStorage=true;AccountName=devstoreaccount1;AccountKey=abc123=',
},
accountName: 'devstoreaccount1',
connectionString: 'UseDevelopmentStorage=true;AccountName=devstoreaccount1;AccountKey=abc123=',
}));
vi.mock('./service-config/queue/index.ts', () => ({
createQueueServices: vi.fn(() => ({
queueService: { startUp: vi.fn() },
queueLogger: undefined,
provisionQueues: ['email-notifications', 'audit-events', 'import-requests'],
})),
accountName: 'devstoreaccount1',
connectionString: 'UseDevelopmentStorage=true;AccountName=devstoreaccount1;AccountKey=abc123=',
logContainer: undefined,
POISON_RETRY_THRESHOLD: 3,
}));
vi.mock('./service-config/token-validation/index.ts', () => ({
portalTokens: new Map([['AccountPortal', 'ACCOUNT_PORTAL']]),
Expand Down Expand Up @@ -146,7 +155,7 @@ describe('apps/api bootstrap', () => {

registerServices?.(serviceRegistry);

expect(registerInfrastructureService).toHaveBeenCalledTimes(5);
expect(registerInfrastructureService).toHaveBeenCalledTimes(6);
// Find the registered blob services by the semantic registration name instead of relying on call order.
const registeredBlobService = registerInfrastructureService.mock.calls.find((c) => c?.[1] === 'BlobStorageService')?.[0];
const registeredClientOpsService = registerInfrastructureService.mock.calls.find((c) => c?.[1] === 'ClientOperationsService')?.[0];
Expand Down
29 changes: 23 additions & 6 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,36 @@ import { restHandlerCreator } from '@ocom/rest';
import { ServiceApolloServer } from '@ocom/service-apollo-server';
import { ServiceBlobStorage } from '@ocom/service-blob-storage';
import { ServiceMongoose } from '@ocom/service-mongoose';
// queue service imports — framework types only imported here
import { queueRegistry } from '@ocom/service-queue-storage';
import { ServiceTokenValidation } from '@ocom/service-token-validation';
import { Cellix } from './cellix.ts';
import * as ApolloServerConfig from './service-config/apollo-server/index.ts';
import * as BlobStorageConfig from './service-config/blob-storage/index.ts';
import * as MongooseConfig from './service-config/mongoose/index.ts';
import * as QueueConfig from './service-config/queue/index.ts';
import * as TokenValidationConfig from './service-config/token-validation/index.ts';

Cellix.initializeInfrastructureServices<ApiContextSpec, ApplicationServices>((serviceRegistry) => {
const { NODE_ENV } = process.env;
const isProd = NODE_ENV === 'production';

const mongooseService = new ServiceMongoose(MongooseConfig.mongooseConnectionString, MongooseConfig.mongooseConnectOptions);
const blobStorageService = isProd ? new ServiceBlobStorage({ accountName: BlobStorageConfig.accountName }) : new ServiceBlobStorage({ connectionString: BlobStorageConfig.connectionString });
const clientOperationsService = new ServiceBlobStorage({ connectionString: BlobStorageConfig.connectionString });
const tokenValidationService = new ServiceTokenValidation(TokenValidationConfig.portalTokens);
const apolloService = new ServiceApolloServer<GraphContext>(ApolloServerConfig.apolloServerOptions);

const { queueService } = QueueConfig.createQueueServices(clientOperationsService, isProd);

serviceRegistry
.registerInfrastructureService(new ServiceMongoose(MongooseConfig.mongooseConnectionString, MongooseConfig.mongooseConnectOptions))
.registerInfrastructureService(isProd ? new ServiceBlobStorage({ accountName: BlobStorageConfig.accountName }) : new ServiceBlobStorage({ connectionString: BlobStorageConfig.connectionString }), 'BlobStorageService')
.registerInfrastructureService(new ServiceBlobStorage({ connectionString: BlobStorageConfig.connectionString }), 'ClientOperationsService')
.registerInfrastructureService(new ServiceTokenValidation(TokenValidationConfig.portalTokens))
.registerInfrastructureService(new ServiceApolloServer<GraphContext>(ApolloServerConfig.apolloServerOptions));
})
.registerInfrastructureService(mongooseService)
.registerInfrastructureService(blobStorageService, 'BlobStorageService')
.registerInfrastructureService(clientOperationsService, 'ClientOperationsService')
.registerInfrastructureService(queueService, 'QueueStorageService')
.registerInfrastructureService(tokenValidationService)
.registerInfrastructureService(apolloService);
})
.setContext((serviceRegistry) => {
const dataSourcesFactory = MongooseConfig.mongooseContextBuilder(serviceRegistry.getInfrastructureService<ServiceMongoose>(ServiceMongoose));

Expand All @@ -40,6 +52,11 @@ Cellix.initializeInfrastructureServices<ApiContextSpec, ApplicationServices>((se
apolloServerService: serviceRegistry.getInfrastructureService<ServiceApolloServer>(ServiceApolloServer),
blobStorageService: serviceRegistry.getInfrastructureService<ServiceBlobStorage>('BlobStorageService'),
clientOperationsService: serviceRegistry.getInfrastructureService<ServiceBlobStorage>('ClientOperationsService'),
// create typed producer/consumer context for queues (OCOM adapter provides registry)
...(() => {
const bound = queueRegistry._bind(serviceRegistry.getInfrastructureService('QueueStorageService'));
return { queueProducer: bound.producer, queueConsumer: bound.consumer };
})(),
};
})
.initializeApplicationServices((context) => buildApplicationServicesFactory(context))
Expand Down
36 changes: 36 additions & 0 deletions apps/api/src/service-config/queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { BlobQueueMessageLogger, ServiceQueueStorage } from '@cellix/service-queue-storage';
import type { ServiceBlobStorage } from '@ocom/service-blob-storage';
import { allQueueNames } from '@ocom/service-queue-storage';

const { AZURE_QUEUE_ACCOUNT_NAME: accountName, AZURE_QUEUE_CONNECTION_STRING: connectionString, QUEUE_LOG_CONTAINER: logContainer } = process.env;

if (!accountName) {
throw new Error('Missing AZURE_QUEUE_ACCOUNT_NAME environment variable. Required for queue operations with managed identity authentication.');
}

if (!connectionString) {
// Some applications may not require connection string; however for client operations we expect it
throw new Error('Missing AZURE_QUEUE_CONNECTION_STRING environment variable. Required for connection-string-based queue operations.');
}

export function createQueueServices(clientOperationsService: ServiceBlobStorage, isProd: boolean) {
const queueLoggingEnabled = !!logContainer;
let queueLogger: BlobQueueMessageLogger | undefined;
if (queueLoggingEnabled) {
// BlobQueueMessageLogger expects an object with uploadText({ containerName, blobName, text })
const blobLike = clientOperationsService as unknown as { uploadText(request: { containerName: string; blobName: string; text: string }): Promise<unknown> };
queueLogger = new BlobQueueMessageLogger(blobLike, logContainer as string);
}

// Build the list of queues to auto-provision from the application's queue registry when available
// This keeps configuration centralized in the OCOM queue registry
const provisionQueues = Array.isArray(allQueueNames) && allQueueNames.length > 0 ? allQueueNames : ['email-notifications', 'audit-events', 'import-requests'];
const qAccount = accountName as string | undefined;
const qConnection = connectionString as string | undefined;

const queueService = isProd
? new ServiceQueueStorage({ accountName: qAccount as string, logging: { enabled: queueLoggingEnabled, container: logContainer as string }, logger: queueLogger, provisionQueues })
: new ServiceQueueStorage({ connectionString: qConnection as string, localDev: !isProd, logging: { enabled: queueLoggingEnabled, container: logContainer as string }, logger: queueLogger, provisionQueues });

return { queueService, queueLogger, provisionQueues };
}
2 changes: 2 additions & 0 deletions apps/api/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
{ "path": "../../packages/ocom/persistence" },
{ "path": "../../packages/ocom/rest" },
{ "path": "../../packages/ocom/service-blob-storage" },
{ "path": "../../packages/ocom/service-queue-storage" },
{ "path": "../../packages/cellix/service-queue-storage" },
{ "path": "../../packages/ocom/service-mongoose" },
{ "path": "../../packages/ocom/service-otel" },
{ "path": "../../packages/ocom/service-token-validation" }
Expand Down
8 changes: 8 additions & 0 deletions knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
"project": ["src/**/*.ts"],
"ignore": ["**/mongo-connection.ts"]
},
"packages/cellix/service-queue-storage": {
"entry": ["src/index.ts"],
"project": ["src/**/*.ts"]
},
"packages/ocom/service-queue-storage": {
"entry": ["src/index.ts"],
"project": ["src/**/*.ts"]
},
"packages/cellix/ui-core/*": {
"entry": ["src/index.ts"],
"project": ["src/**/*.{ts,tsx}"]
Expand Down
4 changes: 4 additions & 0 deletions packages/cellix/service-queue-storage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/dist
/node_modules

tsconfig.tsbuidinfo
7 changes: 7 additions & 0 deletions packages/cellix/service-queue-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# @cellix/service-queue-storage

Type-safe Azure Queue Storage framework service for Cellix.

Provides: ServiceQueueStorage, message contracts, blob-backed logging, and poison-queue helpers.

See manifest.md for public surface.
10 changes: 10 additions & 0 deletions packages/cellix/service-queue-storage/manifest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Public surface

- ServiceQueueStorage
- registerQueues
- createQueueProducer
- createQueueConsumer
- defineQueueMessage
- BlobQueueMessageLogger
- moveMessageToPoison / handleMessageWithRetries / PoisonQueueOptions
- types: QueueMessage, QueueStorageConfig, QueueMessageContract, OutboundQueueSchema, InboundQueueSchema, QueueProducerContext, QueueConsumerContext
41 changes: 41 additions & 0 deletions packages/cellix/service-queue-storage/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "@cellix/service-queue-storage",
"version": "1.0.0",
"private": true,
"type": "module",
"files": [
"dist"
],
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"scripts": {
"format": "biome format --write",
"format:check": "biome format .",
"prebuild": "pnpm run lint",
"build": "tsgo --build",
"watch": "tsgo --watch",
"test": "vitest -c vitest.config.ts run --exclude src/**/*.integration.test.ts --silent --reporter=dot",
"test:coverage": "vitest run --coverage --exclude src/**/*.integration.test.ts --reporter=dot",
"test:integration": "vitest run src/service-queue-storage.integration.test.ts --reporter=dot",
"test:watch": "vitest",
"lint": "biome lint",
"clean": "rimraf dist"
},
"dependencies": {
"@azure/storage-queue": "^12.10.0",
"@azure/identity": "^4.13.1",
"zod": "^3.22.2"
},
"devDependencies": {
"@cellix/config-typescript": "workspace:*",
"@cellix/config-vitest": "workspace:*",
"@vitest/coverage-istanbul": "catalog:",
"rimraf": "catalog:",
"typescript": "catalog:",
"vitest": "catalog:"
}
}
26 changes: 26 additions & 0 deletions packages/cellix/service-queue-storage/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
export type {
InboundQueueMap,
InboundQueueSchema,
IQueueConsumerOperations,
IQueueStorageOperations,
OutboundQueueMap,
OutboundQueueSchema,
PeekMessagesOptions,
QueueMessage,
QueueMessageContract,
QueueStorageConfig,
ReceiveMessagesOptions,
SendMessageOptions,
} from './interfaces.js';
export type { LogAddress } from './logging.js';
export { BlobQueueMessageLogger } from './logging.js';

export { defineQueueMessage } from './message-contracts.js';
export { moveMessageToPoison } from './poison.js';
export type { QueueConsumerContext } from './queue-consumer.js';
export { createQueueConsumer } from './queue-consumer.js';
export type { QueueDefinition, QueueDefinitions, QueueProducerContext } from './queue-producer.js';
export { createQueueProducer } from './queue-producer.js';

export { registerQueues } from './register-queues.js';
export { ServiceQueueStorage } from './service-queue-storage.js';
62 changes: 62 additions & 0 deletions packages/cellix/service-queue-storage/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { ZodTypeAny } from 'zod';
import type { IQueueMessageLogger } from './logging.js';

export type QueueStorageConfig = {
accountName?: string;
connectionString?: string;
localDev?: boolean;
/** Optional list of queues that should be auto-provisioned in local/dev environments */
provisionQueues?: string[];
logging?: {
enabled: boolean;
container: string;
await?: boolean;
};
/** Optional logger implementation for persisting message envelopes */
logger?: IQueueMessageLogger;
};

export type QueueMessage<T = unknown> = {
id: string;
popReceipt?: string;
payload: T;
dequeueCount?: number;
};

export type SendMessageOptions = { visibilityTimeoutSeconds?: number; loggingTags?: Record<string, string> };
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): visibilityTimeoutSeconds in SendMessageOptions is not used when sending messages.

SendMessageOptions exposes visibilityTimeoutSeconds, but ServiceQueueStorage.sendMessage ignores it and always calls queueClient.sendMessage(encoded) with no options. This will silently drop any caller-specified visibility timeout. Please either pass it through to the Azure SDK (e.g. { visibilityTimeout: opts.visibilityTimeoutSeconds }) or remove the field from the public type to avoid a misleading API.

export type ReceiveMessagesOptions = { maxMessages?: number; visibilityTimeout?: number };
export type PeekMessagesOptions = { maxMessages?: number };

export interface IQueueStorageOperations {
sendMessage<_T = unknown>(queue: string, message: string | object, opts?: SendMessageOptions): Promise<void>;
sendValidatedMessage<T>(queue: string, contract: QueueMessageContract<T>, payload: T, opts?: SendMessageOptions): Promise<void>;
receiveMessages<_T = unknown>(queue: string, opts?: ReceiveMessagesOptions): Promise<QueueMessage<_T>[]>;
deleteMessage(queue: string, messageId: string, popReceipt: string): Promise<void>;
peekMessages<_T = unknown>(queue: string, opts?: PeekMessagesOptions): Promise<QueueMessage<_T>[]>;
}

export interface IQueueConsumerOperations {
receiveMessages<T = unknown>(queue: string, opts?: ReceiveMessagesOptions): Promise<QueueMessage<T>[]>;
deleteMessage(queue: string, messageId: string, popReceipt: string): Promise<void>;
}

export type QueueMessageContract<T> = {
encode(payload: T): string;
decode(raw: string): T;
};

// New: explicit schema shapes for application-level queue definitions
export type OutboundQueueSchema<S extends ZodTypeAny = ZodTypeAny> = {
queueName: string;
schema: S;
loggingTags?: Record<string, string>;
};

export type InboundQueueSchema<S extends ZodTypeAny = ZodTypeAny> = {
queueName: string;
schema: S;
loggingTags?: Record<string, string>;
};

export type OutboundQueueMap = Record<string, OutboundQueueSchema>;
export type InboundQueueMap = Record<string, InboundQueueSchema>;
33 changes: 33 additions & 0 deletions packages/cellix/service-queue-storage/src/logging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export type MessageLogEnvelope = {
queue: string;
messageId?: string;
payload: unknown;
metadata?: Record<string, unknown>;
createdAt?: string;
};

export type LogAddress = { container: string; blobName: string; url?: string };

export interface IQueueMessageLogger {
logMessage(envelope: MessageLogEnvelope): Promise<LogAddress>;
}

type BlobStorageLike = {
uploadText(request: { containerName: string; blobName: string; text: string }): Promise<unknown>;
};

export class BlobQueueMessageLogger implements IQueueMessageLogger {
private readonly blobStorage: BlobStorageLike;
private readonly containerName: string;
constructor(blobStorage: BlobStorageLike, containerName: string) {
this.blobStorage = blobStorage;
this.containerName = containerName;
}

public async logMessage(envelope: MessageLogEnvelope): Promise<LogAddress> {
const name = `${envelope.queue}/${envelope.messageId ?? Date.now().toString()}.json`;
const text = JSON.stringify({ envelope }, null, 2);
await this.blobStorage.uploadText({ containerName: this.containerName, blobName: name, text });
return { container: this.containerName, blobName: name, url: `${this.containerName}/${name}` };
}
}
14 changes: 14 additions & 0 deletions packages/cellix/service-queue-storage/src/message-contracts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { ZodType } from 'zod';

export function defineQueueMessage<T>(schema: ZodType<T>) {
return {
encode(payload: T): string {
schema.parse(payload);
return JSON.stringify(payload);
},
decode(raw: string): T {
const parsed = JSON.parse(raw);
return schema.parse(parsed);
},
};
}
Loading