Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
/**
* Move Job to Jobs and it's PID to the processing list.
*/
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage);
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl);
Comment thread
loks0n marked this conversation as resolved.
$this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid());

/**
Expand Down Expand Up @@ -150,6 +150,12 @@ public function retry(Queue $queue, ?int $limit = null): void
}

$this->enqueue($queue, $job->getPayload());

/**
* Remove old job record after re-enqueueing to prevent memory leak.
*/
$this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}");

Comment thread
ChiragAgg5k marked this conversation as resolved.
Outdated
$processed++;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Queue/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public function listSize(string $key): int;
public function listRange(string $key, int $total, int $offset): array;
public function remove(string $key): bool;
public function move(string $queue, string $destination): bool;
public function set(string $key, string $value): bool;
public function set(string $key, string $value, int $ttl = 0): bool;
public function get(string $key): array|string|null;
public function setArray(string $key, array $value): bool;
public function setArray(string $key, array $value, int $ttl = 0): bool;
public function increment(string $key): int;
public function decrement(string $key): int;
public function ping(): bool;
Expand Down
9 changes: 6 additions & 3 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,16 @@ public function move(string $queue, string $destination): bool
return $this->getRedis()->move($queue, $destination);
}

public function setArray(string $key, array $value): bool
public function setArray(string $key, array $value, int $ttl = 0): bool
{
return $this->set($key, json_encode($value));
return $this->set($key, json_encode($value), $ttl);
}

public function set(string $key, string $value): bool
public function set(string $key, string $value, int $ttl = 0): bool
{
if ($ttl > 0) {
return $this->getRedis()->setex($key, $ttl, $value);
}
return $this->getRedis()->set($key, $value);
}
Comment thread
ChiragAgg5k marked this conversation as resolved.

Expand Down
1 change: 1 addition & 0 deletions src/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
public function __construct(
public string $name,
public string $namespace = 'utopia-queue',
public int $jobTtl = 86400,
) {
Comment thread
ChiragAgg5k marked this conversation as resolved.
if (empty($this->name)) {
throw new \InvalidArgumentException('Cannot create queue with empty name.');
Expand Down
Loading