Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@athenna/queue",
"version": "5.29.0",
"version": "5.30.0",
"description": "The Athenna queue handler.",
"license": "MIT",
"author": "João Lenon <lenon@athenna.io>",
Expand Down
73 changes: 47 additions & 26 deletions src/drivers/AwsSqsDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Driver } from '#src/drivers/Driver'
import { Is, Options, Uuid } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'
import { AwsSqsDriverExceptionHandler } from '#src/handlers/AwsSqsDriverExceptionHandler'
import { NotFifoSqsQueueTypeException } from '#src/exceptions/NotFifoSqsQueueTypeException'

Expand Down Expand Up @@ -390,17 +391,30 @@ export class AwsSqsDriver extends Driver<SQSClient> {
const heartbeatDelay = this.calculateHeartbeatDelay()

let heartbeatTimeout: NodeJS.Timeout
let scope: QueueExecutionScope | null = null

const startHeartbeat = () => {
if (heartbeatDelay <= 0) {
return
}

heartbeatTimeout = setInterval(() => {
this.changeJobVisibility(
const heartbeat = scope?.bind(async () => {
await this.changeJobVisibility(
job.id,
this.msToS(this.visibilityTimeout)
).catch(() => {})
)
})

heartbeatTimeout = setInterval(() => {
const changeVisibility =
heartbeat ||
(() =>
this.changeJobVisibility(
job.id,
this.msToS(this.visibilityTimeout)
))

Promise.resolve(changeVisibility()).catch(() => {})
}, heartbeatDelay)
}

Expand All @@ -419,34 +433,41 @@ export class AwsSqsDriver extends Driver<SQSClient> {
data: job.data
}

await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
startHeartbeat()

await processor(workerJob)

stopHeartbeat()

if (!AwsSqsDriver.ackedIds.has(job.id)) {
await this.changeJobVisibility(
job.id,
this.msToS(this.noAckDelayMs + requeueJitterMs)
)
await this.runScopedQueueProcessor(
processor,
workerJob,
async () => {
try {
startHeartbeat()

await processor(workerJob)

stopHeartbeat()

if (!AwsSqsDriver.ackedIds.has(job.id)) {
await this.changeJobVisibility(
job.id,
this.msToS(this.noAckDelayMs + requeueJitterMs)
)
}
} catch (error) {
await new AwsSqsDriverExceptionHandler().handle({
job,
error,
driver: this,
stopHeartbeat,
requeueJitterMs
})
}
} catch (error) {
await new AwsSqsDriverExceptionHandler().handle({
job,
error,
driver: this,
stopHeartbeat,
requeueJitterMs
})
},
executionScope => {
scope = executionScope
}
})
)
}

/**
* Send a job to the deadletter quue.
* Send a job to the deadletter queue.
*/
public async sendJobToDLQ(job: any) {
if (Is.Object(job.data)) {
Expand Down
71 changes: 65 additions & 6 deletions src/drivers/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
*/

import { Utils } from '#src/utils'
import { Is } from '@athenna/common'
import { Config } from '@athenna/config'
import type { ConnectionOptions } from '#src/types'
import type { Job, ConnectionOptions } from '#src/types'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'

export const RUN_WITH_WORKER_CONTEXT = Symbol.for(
'@athenna/queue.runWithWorkerContext'
)

export type ScopedQueueProcessor<T = unknown> = ((data: T) => any | Promise<any>) & {
export type ScopedQueueProcessor<T = unknown> = ((
data: T
) => any | Promise<any>) & {
[RUN_WITH_WORKER_CONTEXT]?: (
data: T,
callback: () => any | Promise<any>
callback: () => any | Promise<any>,
captureScope?: (scope: QueueExecutionScope<T>) => void
) => any | Promise<any>
}

Expand Down Expand Up @@ -82,6 +87,11 @@ export abstract class Driver<Client = any> {
jitter: number
}

/**
* Set the custom options used when creating this driver.
*/
public options?: ConnectionOptions['options']

/**
* Creates a new instance of the Driver.
*/
Expand All @@ -92,6 +102,8 @@ export abstract class Driver<Client = any> {
) {
const config = Config.get(`queue.connections.${connection}`)

this.options = options

this.workerInterval =
options?.workerInterval || config.workerInterval || 1000
this.noAckDelayMs = Utils.computeNoAckDelayMs(
Expand Down Expand Up @@ -178,15 +190,48 @@ export abstract class Driver<Client = any> {
protected runScopedQueueProcessor<T>(
processor: ScopedQueueProcessor<T>,
data: T,
callback: () => any | Promise<any>
callback: () => any | Promise<any>,
captureScope?: (scope: QueueExecutionScope<T>) => void
) {
const runner = processor[RUN_WITH_WORKER_CONTEXT]

if (runner) {
return runner(data, callback)
return runner(data, callback, captureScope)
}

const scope = new QueueExecutionScope<T>({
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: this.createContextJob(data)
})

captureScope?.(scope)

return scope.run(callback)
}

private createContextJob<T>(data: T) {
if (this.isJob(data)) {
return data
}

return callback()
return {
id: null,
attempts: this.attempts,
data
} as Job
}

private isJob(data: unknown): data is Job {
if (!data || !Is.Object(data)) {
return false
}

const candidate = data as Partial<Job>

return 'data' in candidate && 'attempts' in candidate
}

/**
Expand Down Expand Up @@ -244,6 +289,20 @@ export abstract class Driver<Client = any> {
*/
public abstract isEmpty(): Promise<boolean>

/**
* Change the job visibility values in the queue.
*/
public changeJobVisibility(_jobId: string, _seconds: number): Promise<void> {
return Promise.resolve()
}

/**
* Send a job to the deadletter queue.
*/
public sendJobToDLQ(_jobId: string): Promise<void> {
return Promise.resolve()
}

/**
* Process the next data in the queue.
*/
Expand Down
104 changes: 81 additions & 23 deletions src/drivers/FakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
*/

import { Log } from '@athenna/logger'
import { Json, Options } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { Is, Json, Options } from '@athenna/common'
import type { Job, ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { RUN_WITH_WORKER_CONTEXT, type ScopedQueueProcessor } from '#src/drivers/Driver'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'
import {
RUN_WITH_WORKER_CONTEXT,
type ScopedQueueProcessor
} from '#src/drivers/Driver'

export class FakeDriver {
public constructor(connection?: string, client?: any) {
Expand All @@ -38,6 +42,7 @@ export class FakeDriver {
public static visibilityTimeout: number = 30000
public static workerInterval: number = 1000
public static noAckDelayMs: number = 1700
public static options?: ConnectionOptions['options']
public static backoff: {
type: 'fixed' | 'exponential'
delay: number
Expand Down Expand Up @@ -99,6 +104,7 @@ export class FakeDriver {

this.isConnected = true
this.isSavedOnFactory = options.saveOnFactory
this.options = options.options
}

/**
Expand Down Expand Up @@ -225,18 +231,68 @@ export class FakeDriver {
return 0
}

protected runScopedQueueProcessor<T>(
/**
* Change the job visibility values in the queue.
*/
public static changeJobVisibility(
_jobId: string,
_seconds: number
): Promise<void> {
return Promise.resolve()
}

/**
* Send a job to the deadletter queue.
*/
public static sendJobToDLQ(_jobId: string): Promise<void> {
return Promise.resolve()
}

private static createContextJob<T>(data: T) {
if (this.isJob(data)) {
return data
}

return {
id: null,
attempts: this.attempts,
data
} as Job
}

private static isJob(data: unknown): data is Job {
if (!data || !Is.Object(data)) {
return false
}

const candidate = data as Partial<Job>

return 'data' in candidate && 'attempts' in candidate
}

protected static runScopedQueueProcessor<T>(
processor: ScopedQueueProcessor<T>,
data: T,
callback: () => any | Promise<any>
callback: () => any | Promise<any>,
captureScope?: (scope: QueueExecutionScope<T>) => void
) {
const runner = processor[RUN_WITH_WORKER_CONTEXT]

if (runner) {
return runner(data, callback)
return runner(data, callback, captureScope)
}

return callback()
const scope = new QueueExecutionScope<T>({
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: this.createContextJob(data)
})

captureScope?.(scope)

return scope.run(callback)
}

/**
Expand All @@ -256,21 +312,23 @@ export class FakeDriver {
) {
const data = await this.pop()

try {
await processor(data)
} catch (err) {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
queue: this.queueName,
deadletter: this.deadletter,
name: err.name,
code: err.code,
help: err.help,
details: err.details,
metadata: err.metadata,
stack: err.stack,
job: data
})
}
await this.runScopedQueueProcessor(processor, data, async () => {
try {
await processor(data)
} catch (err) {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
queue: this.queueName,
deadletter: this.deadletter,
name: err.name,
code: err.code,
help: err.help,
details: err.details,
metadata: err.metadata,
stack: err.stack,
job: data
})
}
})
}
}
Loading
Loading