Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/wiki/Scaling-and-Performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,31 @@ To scale your Redis instance, you can enable clustering. Redis clustering allows

10. **Concurrency Control**: Limit the number of concurrent jobs being processed to avoid resource contention. Adjust the concurrency settings based on the available resources and the nature of the jobs.

11. **Batch Size Configuration**: Configure the batch size for job processing to optimize performance. A larger batch size can reduce the number of database operations and improve throughput.

12. **Parallel Execution**: Enable parallel execution of jobs to take advantage of multi-core processors and improve performance. Ensure that your job processing logic is thread-safe.

13. **Redis Pipelining**: Use Redis pipelining to group multiple commands into a single request, reducing latency and improving throughput. This is especially useful for high-volume job processing.

14. **Lua Scripts for Atomic Operations**: Use Lua scripts in Redis to perform atomic operations, ensuring data consistency and reducing the number of round trips to the Redis server.

## Configuration Options for Performance Optimization

To achieve optimal performance and handle more than 100k jobs/sec, you can configure the following options in the `pop-queue/config.js` file:

1. **Batch Size**: Configure the batch size for job processing.
```javascript
batchSize: process.env.BATCH_SIZE || 1000
```

2. **Parallel Execution**: Enable or disable parallel execution of jobs.
```javascript
parallelExecution: process.env.PARALLEL_EXECUTION || true
```

3. **Redis Pipelining**: Enable or disable Redis pipelining.
```javascript
redisPipelining: process.env.REDIS_PIPELINING || true
```

By following these scaling and performance guidelines, you can ensure that your job queue system is robust, scalable, and performs optimally under various load conditions.
16 changes: 14 additions & 2 deletions pop-queue/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ let config = {
workerTimeout: process.env.WORKER_TIMEOUT || 30000,
rateLimit: process.env.RATE_LIMIT || 100,
concurrency: process.env.CONCURRENCY || 5,
backoffStrategy: process.env.BACKOFF_STRATEGY || { type: 'exponential', delay: 1000 }
backoffStrategy: process.env.BACKOFF_STRATEGY || { type: 'exponential', delay: 1000 },
batchSize: process.env.BATCH_SIZE || 1000,
parallelExecution: process.env.PARALLEL_EXECUTION || true,
redisPipelining: process.env.REDIS_PIPELINING || true
};

if (fs.existsSync(configPath)) {
Expand All @@ -24,7 +27,7 @@ if (fs.existsSync(configPath)) {
}

// Validate configuration values
const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy'];
const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy', 'batchSize', 'parallelExecution', 'redisPipelining'];
requiredConfigKeys.forEach(key => {
if (!config[key]) {
throw new Error(`Missing required configuration value: ${key}`);
Expand All @@ -44,6 +47,15 @@ requiredConfigKeys.forEach(key => {
if (key === 'backoffStrategy' && typeof config[key] !== 'object') {
throw new Error(`Invalid configuration value for ${key}: must be an object`);
}
if (key === 'batchSize' && typeof config[key] !== 'number') {
throw new Error(`Invalid configuration value for ${key}: must be a number`);
}
if (key === 'parallelExecution' && typeof config[key] !== 'boolean') {
throw new Error(`Invalid configuration value for ${key}: must be a boolean`);
}
if (key === 'redisPipelining' && typeof config[key] !== 'boolean') {
throw new Error(`Invalid configuration value for ${key}: must be a boolean`);
}
});

module.exports = config;
48 changes: 48 additions & 0 deletions pop-queue/load-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,52 @@ async function startLoadTest() {
}
}

async function runBatchProcessingTest(batchSize) {
const jobName = 'batchProcessingTestJob';
const jobCount = 100000;

console.log(`Creating and enqueuing ${jobCount} jobs for batch processing test...`);
await createAndEnqueueJobs(jobCount);

console.log(`Running batch processing test with batch size ${batchSize}...`);
const startTime = Date.now();
await queue.popBatch(jobName, batchSize);
const endTime = Date.now();

console.log(`Batch processing test completed in ${endTime - startTime} ms.`);
}

async function runParallelExecutionTest(parallelJobCount) {
const jobName = 'parallelExecutionTestJob';
const jobCount = 100000;

console.log(`Creating and enqueuing ${jobCount} jobs for parallel execution test...`);
await createAndEnqueueJobs(jobCount);

console.log(`Running parallel execution test with ${parallelJobCount} parallel jobs...`);
const startTime = Date.now();
await runConcurrentTests(parallelJobCount);
const endTime = Date.now();

console.log(`Parallel execution test completed in ${endTime - startTime} ms.`);
}

async function runRedisPipeliningTest(pipeliningJobCount) {
const jobName = 'redisPipeliningTestJob';
const jobCount = 100000;

console.log(`Creating and enqueuing ${jobCount} jobs for Redis pipelining test...`);
await createAndEnqueueJobs(jobCount);

console.log(`Running Redis pipelining test with ${pipeliningJobCount} jobs...`);
const startTime = Date.now();
await queue.pushToQueueBatch(jobName, pipeliningJobCount);
const endTime = Date.now();

console.log(`Redis pipelining test completed in ${endTime - startTime} ms.`);
}

startLoadTest();
runBatchProcessingTest(1000);
runParallelExecutionTest(1000);
runRedisPipeliningTest(1000);
96 changes: 81 additions & 15 deletions pop-queue/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class PopQueue extends EventEmitter {
new winston.transports.File({ filename: 'combined.log' })
]
});

// Configuration options for batch size, parallel execution, and Redis pipelining
this.batchSize = config.batchSize || 1000;
this.parallelExecution = config.parallelExecution || true;
this.redisPipelining = config.redisPipelining || true;
}

async define(name, fn, options = {}) {
Expand Down Expand Up @@ -412,31 +417,42 @@ class PopQueue extends EventEmitter {
}

async run(name) {
let job = await this.pop(name);
if (!job) {
let jobs = await this.popBatch(name, this.batchSize);
if (!jobs.length) {
let error = new Error(`No job for ${name}`);
error.code = 404;
throw error;
}
try {
if (this.runners[name] && this.runners[name].fn) {
try {
let fnTimeout = setTimeout(() => {
throw new Error("Timeout");
}, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000)
const isSuccess = await this.runners[name].fn(job);
if(isSuccess) {
await this.finish(job, name);
const promises = jobs.map(async (job) => {
try {
let fnTimeout = setTimeout(() => {
throw new Error("Timeout");
}, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000)
const isSuccess = await this.runners[name].fn(job);
if(isSuccess) {
await this.finish(job, name);
}
else{
await this.fail(job, "Failed");
}
clearTimeout(fnTimeout);
} catch(err) {
await this.fail(job, err.toString());
}
else{
await this.fail(job, "Failed");
});
if (this.parallelExecution) {
await Promise.all(promises);
} else {
for (const promise of promises) {
await promise;
}
clearTimeout(fnTimeout);
} catch(err) {
await this.fail(job, err.toString());
}
} else {
await this.fail(job, `Runner ${name} not defined`);
for (const job of jobs) {
await this.fail(job, `Runner ${name} not defined`);
}
throw new Error('Runner not defined');
}
} catch(e) {
Expand All @@ -445,6 +461,56 @@ class PopQueue extends EventEmitter {
}
}

async popBatch(name, batchSize) {
try {
const lock = await this.redlock.lock(`locks:pop:queue:${name}`, 1000);
const pipeline = this.redisClient.pipeline();
for (let i = 0; i < batchSize; i++) {
pipeline.zpopmin(`pop:queue:${name}`, 1);
}
const results = await pipeline.exec();
const jobs = [];
for (const result of results) {
const stringDocument = result[1];
if (stringDocument.length === 0) {
continue;
}
const valueDocument = await this.redisClient.get(`pop:queue:${name}:${stringDocument[0]}`);
if (!valueDocument) {
continue;
}
const document = parseDocFromRedis(valueDocument);
let pickedTime = new Date();
document.pickedAt = pickedTime;
if (this.dbUrl.startsWith('postgres://')) {
const updateQuery = `
UPDATE ${this.cName}
SET attempts = attempts + 1, pickedAt = $1
WHERE identifier = $2;
`;
await this.db.query(updateQuery, [pickedTime, document.identifier]);
} else {
await this.db.collection(this.getDbCollectionName(name)).findOneAndUpdate({
identifier: document.identifier
}, {
$inc: {
attempts: 1
},
$set: {
pickedAt: new Date(pickedTime)
}
});
}
jobs.push(document);
}
await lock.unlock();
return jobs;
} catch(err) {
console.log("error parsing doc from redis", err);
this.logger.error('Error popping batch from queue:', err);
}
}

async getQueueLength(name) {
try {
return await this.redisClient.zcount(`pop:queue:${name}`, '-inf', '+inf');
Expand Down
Loading