Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 8 additions & 61 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
});

this.telemetry.recordEnqueue(
this.telemetry.messageAttributes({
queueId: options.queueId,
tenantId: options.tenantId,
messageId,
})
);
this.telemetry.recordEnqueue();

this.logger.debug("Message enqueued", {
queueId: options.queueId,
Expand Down Expand Up @@ -431,13 +425,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
});

this.telemetry.recordEnqueueBatch(
messageIds.length,
this.telemetry.messageAttributes({
queueId: options.queueId,
tenantId: options.tenantId,
})
);
this.telemetry.recordEnqueueBatch(messageIds.length);

this.logger.debug("Batch enqueued", {
queueId: options.queueId,
Expand Down Expand Up @@ -1387,14 +1375,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {

// Record queue time
const queueTime = startTime - storedMessage.timestamp;
this.telemetry.recordQueueTime(
queueTime,
this.telemetry.messageAttributes({
queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
})
);
this.telemetry.recordQueueTime(queueTime);

// Build handler context
const handlerContext: MessageHandlerContext<z.infer<TPayloadSchema>> = {
Expand All @@ -1410,21 +1391,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
},
complete: async () => {
await this.#completeMessage(storedMessage, queueId, queueKey, masterQueueKey, descriptor);
this.telemetry.recordComplete(
this.telemetry.messageAttributes({
queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
})
);
this.telemetry.recordProcessingTime(
Date.now() - startTime,
this.telemetry.messageAttributes({
queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
})
);
this.telemetry.recordComplete();
this.telemetry.recordProcessingTime(Date.now() - startTime);
},
release: async () => {
await this.#releaseMessage(
Expand Down Expand Up @@ -1550,14 +1518,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
descriptor: QueueDescriptor,
error?: Error
): Promise<void> {
this.telemetry.recordFailure(
this.telemetry.messageAttributes({
queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
attempt: storedMessage.attempt,
})
);
this.telemetry.recordFailure();

// Check retry strategy
if (this.retryStrategy) {
Expand Down Expand Up @@ -1588,14 +1549,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
await this.concurrencyManager.release(descriptor, storedMessage.id);
}

this.telemetry.recordRetry(
this.telemetry.messageAttributes({
queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
attempt: storedMessage.attempt + 1,
})
);
this.telemetry.recordRetry();

this.logger.debug("Message scheduled for retry", {
messageId: storedMessage.id,
Expand Down Expand Up @@ -1651,14 +1605,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
pipeline.hset(dlqDataKey, storedMessage.id, JSON.stringify(dlqMessage));
await pipeline.exec();

this.telemetry.recordDLQ(
this.telemetry.messageAttributes({
queueId: storedMessage.queueId,
tenantId: storedMessage.tenantId,
messageId: storedMessage.id,
attempt: storedMessage.attempt,
})
);
this.telemetry.recordDLQ();

this.logger.info("Message moved to DLQ", {
messageId: storedMessage.id,
Expand Down
3 changes: 2 additions & 1 deletion packages/redis-worker/src/fair-queue/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ export class FairQueueTelemetry {
// ============================================================================

/**
* Create standard attributes for a message operation.
* Create standard attributes for a message operation (for spans/traces).
* Use this for span attributes where high cardinality is acceptable.
*/
messageAttributes(params: {
queueId?: string;
Expand Down