Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions .github/wiki/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ The following environment variables can be set to configure the `pop-queue` libr
- `RATE_LIMIT`: The rate limit for job processing.
- `CONCURRENCY`: The maximum number of concurrent jobs being processed.
- `BACKOFF_STRATEGY`: The backoff strategy for job retries (e.g., `{"type":"exponential","delay":1000}`).
- `BATCH_SIZE`: The batch size for job processing.
- `PARALLEL_EXECUTION`: Whether to enable parallel execution of jobs.
- `REDIS_PIPELINING`: Whether to enable Redis pipelining.
- `NOTIFICATION_CONFIG`: The configuration for notifications.

## Configuration File

Expand All @@ -39,7 +43,11 @@ Here is an example `config.json` file:
"backoffStrategy": {
"type": "exponential",
"delay": 1000
}
},
"batchSize": 1000,
"parallelExecution": true,
"redisPipelining": true,
"notificationConfig": {}
}
```

Expand All @@ -61,6 +69,10 @@ The following configuration values are required:
- `rateLimit`
- `concurrency`
- `backoffStrategy`
- `batchSize`
- `parallelExecution`
- `redisPipelining`
- `notificationConfig`

## Example

Expand All @@ -79,6 +91,10 @@ WORKER_TIMEOUT=30000
RATE_LIMIT=100
CONCURRENCY=5
BACKOFF_STRATEGY={"type":"exponential","delay":1000}
BATCH_SIZE=1000
PARALLEL_EXECUTION=true
REDIS_PIPELINING=true
NOTIFICATION_CONFIG={}
```

2. Create a `config.json` file in the root directory of the project and add the following configuration values:
Expand All @@ -99,7 +115,11 @@ BACKOFF_STRATEGY={"type":"exponential","delay":1000}
"backoffStrategy": {
"type": "exponential",
"delay": 1000
}
},
"batchSize": 1000,
"parallelExecution": true,
"redisPipelining": true,
"notificationConfig": {}
}
```

Expand Down
229 changes: 214 additions & 15 deletions .github/wiki/Error-Handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,66 @@ app.post('/api/requeue-job', async (req, res) => {
});
```

### Example: Registering a Worker

**Endpoint:** `POST /api/register-worker`

**Error Handling:**
- If an error occurs while registering the worker, a `500 Internal Server Error` status code is returned along with an error message.

**Code:**
```javascript
app.post('/api/register-worker', async (req, res) => {
try {
await queue.registerWorker();
res.status(200).json({ message: 'Worker registered successfully' });
} catch (error) {
logger.error('Error registering worker:', error);
res.status(500).json({ error: 'Failed to register worker' });
}
});
```

### Example: Deregistering a Worker

**Endpoint:** `POST /api/deregister-worker`

**Error Handling:**
- If an error occurs while deregistering the worker, a `500 Internal Server Error` status code is returned along with an error message.

**Code:**
```javascript
app.post('/api/deregister-worker', async (req, res) => {
try {
await queue.deregisterWorker();
res.status(200).json({ message: 'Worker deregistered successfully' });
} catch (error) {
logger.error('Error deregistering worker:', error);
res.status(500).json({ error: 'Failed to deregister worker' });
}
});
```

### Example: Redistributing Jobs

**Endpoint:** `POST /api/redistribute-jobs`

**Error Handling:**
- If an error occurs while redistributing jobs, a `500 Internal Server Error` status code is returned along with an error message.

**Code:**
```javascript
app.post('/api/redistribute-jobs', async (req, res) => {
try {
await queue.redistributeJobs();
res.status(200).json({ message: 'Jobs redistributed successfully' });
} catch (error) {
logger.error('Error redistributing jobs:', error);
res.status(500).json({ error: 'Failed to redistribute jobs' });
}
});
```

## Error Handling in Queue Operations

### General Error Handling
Expand Down Expand Up @@ -106,31 +166,42 @@ async now(job, name, identifier, score, priority = 0, delay = 0) {
**Code:**
```javascript
async run(name) {
let job = await this.pop(name);
if (!job) {
let jobs = await this.popBatch(name, this.batchSize);
if (!jobs.length) {
let error = new Error(`No job for ${name}`);
error.code = 404;
throw error;
}
try {
if (this.runners[name] && this.runners[name].fn) {
try {
let fnTimeout = setTimeout(() => {
throw new Error("Timeout");
}, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000)
const isSuccess = await this.runners[name].fn(job);
if(isSuccess) {
await this.finish(job, name);
const promises = jobs.map(async (job) => {
try {
let fnTimeout = setTimeout(() => {
throw new Error("Timeout");
}, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000)
const isSuccess = await this.runners[name].fn(job);
if(isSuccess) {
await this.finish(job, name);
}
else{
await this.fail(job, "Failed");
}
clearTimeout(fnTimeout);
} catch(err) {
await this.fail(job, err.toString());
}
else{
await this.fail(job, "Failed");
});
if (this.parallelExecution) {
await Promise.all(promises);
} else {
for (const promise of promises) {
await promise;
}
clearTimeout(fnTimeout);
} catch(err) {
await this.fail(job, err.toString());
}
} else {
await this.fail(job, `Runner ${name} not defined`);
for (const job of jobs) {
await this.fail(job, `Runner ${name} not defined`);
}
throw new Error('Runner not defined');
}
} catch(e) {
Expand All @@ -139,3 +210,131 @@ async run(name) {
}
}
```

### Example: Failing a Job

**Operation:** `queue.fail`

**Error Handling:**
- If an error occurs while failing a job, the error is logged and the job is not moved to the dead letter queue.

**Code:**
```javascript
async fail(document, reason, force) {
try {
if (document.attempts >= this.retries && !force) {
let finishTime = new Date();
if (this.dbUrl.startsWith('postgres://')) {
const updateQuery = `
UPDATE ${this.cName}
SET finishedAt = $1, status = 'failed', requeuedAt = $2, failedReason = COALESCE(failedReason, '[]'::jsonb) || $3::jsonb
WHERE identifier = $4;
`;
await this.db.query(updateQuery, [finishTime, new Date(), JSON.stringify({ reason, time: new Date() }), document.identifier]);
} else {
await this.db.collection(this.getDbCollectionName(document.name)).findOneAndUpdate({
identifier: document.identifier
}, {
$push: {
failedReason: {
reason,
time: new Date()
},
},
$set: {
finishedAt: finishTime,
status: 'failed',
requeuedAt: new Date()
}
});
}
await this.moveToDeadLetterQueue(document);
await this.notifySystems('jobFailed', document);
this.metrics.jobsFailed++;
this.emit('jobFailed', document);
} else {
if (this.dbUrl.startsWith('postgres://')) {
const updateQuery = `
UPDATE ${this.cName}
SET pickedAt = NULL, finishedAt = NULL, status = NULL, duration = NULL, requeuedAt = $1,
failedReason = COALESCE(failedReason, '[]'::jsonb) || $2::jsonb,
runHistory = COALESCE(runHistory, '[]'::jsonb) || $3::jsonb
WHERE identifier = $4
RETURNING *;
`;
const result = await this.db.query(updateQuery, [new Date(), JSON.stringify({ reason, time: new Date() }), JSON.stringify({
pickedAt: document.pickedAt,
finishedAt: document.finishedAt,
status: document.status,
duration: document.duration
}), document.identifier]);
const newDocument = result.rows[0];
if (newDocument) {
await sleep(2000);
await this.pushToQueue(newDocument, newDocument.name, newDocument.identifier);
}
} else {
let newDocument = await this.db.collection(this.getDbCollectionName(document.name)).findOneAndUpdate({
identifier: document.identifier
}, {
$unset: {
pickedAt: 1,
finishedAt: 1,
status: 1,
duration: 1
},
$push: {
failedReason: {
reason,
time: new Date()
},
runHistory: {
pickedAt: document.pickedAt,
finishedAt: document.finishedAt,
status: document.status,
duration: document.duration
}
},
$set: {
requeuedAt: new Date()
}
}, {new: true});
if(newDocument.value && newDocument.value.name) {
await sleep(2000);
await this.pushToQueue(newDocument.value, newDocument.value.name, newDocument.value.identifier);
}
}
}
} catch (e) {
console.log(e);
this.logger.error('Error failing job:', e);
}
}
```

### Example: Moving a Job to Dead Letter Queue

**Operation:** `queue.moveToDeadLetterQueue`

**Error Handling:**
- If an error occurs while moving a job to the dead letter queue, the error is logged and the job is not moved.

**Code:**
```javascript
async moveToDeadLetterQueue(document) {
try {
if (this.dbUrl.startsWith('postgres://')) {
const insertQuery = `
INSERT INTO dead_letter_queue (data, createdAt, name, identifier, priority, delay, pickedAt, finishedAt, attempts, status, duration, requeuedAt, failedReason, runHistory)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14);
`;
await this.db.query(insertQuery, [document.data, document.createdAt, document.name, document.identifier, document.priority, document.delay, document.pickedAt, document.finishedAt, document.attempts, document.status, document.duration, document.requeuedAt, document.failedReason, document.runHistory]);
} else {
await this.db.collection('dead_letter_queue').insertOne(document);
}
} catch (e) {
console.log(e);
this.logger.error('Error moving job to dead letter queue:', e);
}
}
```
8 changes: 6 additions & 2 deletions config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ let config = {
backoffStrategy: process.env.BACKOFF_STRATEGY || { type: 'exponential', delay: 1000 },
batchSize: process.env.BATCH_SIZE || 1000,
parallelExecution: process.env.PARALLEL_EXECUTION || true,
redisPipelining: process.env.REDIS_PIPELINING || true
redisPipelining: process.env.REDIS_PIPELINING || true,
notificationConfig: process.env.NOTIFICATION_CONFIG || {}
};

if (fs.existsSync(configPath)) {
Expand All @@ -27,7 +28,7 @@ if (fs.existsSync(configPath)) {
}

// Validate configuration values
const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy', 'batchSize', 'parallelExecution', 'redisPipelining'];
const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy', 'batchSize', 'parallelExecution', 'redisPipelining', 'notificationConfig'];
requiredConfigKeys.forEach(key => {
if (!config[key]) {
throw new Error(`Missing required configuration value: ${key}`);
Expand Down Expand Up @@ -56,6 +57,9 @@ requiredConfigKeys.forEach(key => {
if (key === 'redisPipelining' && typeof config[key] !== 'boolean') {
throw new Error(`Invalid configuration value for ${key}: must be a boolean`);
}
if (key === 'notificationConfig' && typeof config[key] !== 'object') {
throw new Error(`Invalid configuration value for ${key}: must be an object`);
}
});

module.exports = config;
7 changes: 1 addition & 6 deletions pop-queue/jobExecution.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async function pushToBatchQueue(redisClient, documents, name) {

async function popBatch(redisClient, redlock, name, batchSize) {
try {
// const lock = await redlock.acquire([`locks:pop:queue:${name}`], 1000);
const pipeline = redisClient.pipeline();
for (let i = 0; i < batchSize; i++) {
pipeline.zpopmin(`pop:queue:${name}`, 1);
Expand Down Expand Up @@ -82,7 +81,6 @@ async function popBatch(redisClient, redlock, name, batchSize) {
}
jobs.push(document);
}
// await lock.unlock();
return jobs;
} catch(err) {
console.log("error parsing doc from redis", err);
Expand All @@ -92,12 +90,10 @@ async function popBatch(redisClient, redlock, name, batchSize) {

async function pop(redisClient, redlock, name) {
try {
// const lock = await redlock.acquire([`locks:pop:queue:${name}`], 1000);
let stringDocument = await redisClient.zpopmin(`pop:queue:${name}`, 1);
let valueDocument = await redisClient.get(`pop:queue:${name}:${stringDocument[0]}`);
if (!valueDocument || stringDocument.length == 0) {
console.log("no document in redis");
// await lock.unlock();
return null;
}
let document = parseDocFromRedis(valueDocument);
Expand All @@ -122,7 +118,6 @@ async function pop(redisClient, redlock, name) {
}
});
}
// await lock.unlock();
return document;
} catch(err) {
console.log("error parsing doc from redis", err);
Expand Down Expand Up @@ -280,4 +275,4 @@ module.exports = {
finish,
fail,
moveToDeadLetterQueue
};
};
Loading
Loading