Skip to content

Commit 5f0083e

Browse files
committed
feat: add workerStop handling
1 parent 0eccc55 commit 5f0083e

3 files changed

Lines changed: 69 additions & 82 deletions

File tree

src/Queue/Adapter/Swoole.php

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Utopia\Queue\Adapter;
44

5+
use Swoole\Constant;
56
use Swoole\Process\Pool;
67
use Utopia\Queue\Adapter;
78
use Utopia\Queue\Consumer;
@@ -10,6 +11,9 @@ class Swoole extends Adapter
1011
{
1112
protected Pool $pool;
1213

14+
/** @var callable */
15+
private $onStop;
16+
1317
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
1418
{
1519
parent::__construct($workerNum, $queue, $namespace);
@@ -27,13 +31,16 @@ public function start(): self
2731

2832
public function stop(): self
2933
{
34+
if ($this->onStop) {
35+
call_user_func($this->onStop);
36+
}
3037
$this->pool->shutdown();
3138
return $this;
3239
}
3340

3441
public function workerStart(callable $callback): self
3542
{
36-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
43+
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
3744
call_user_func($callback, $workerId);
3845
});
3946

@@ -42,10 +49,11 @@ public function workerStart(callable $callback): self
4249

4350
public function workerStop(callable $callback): self
4451
{
45-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
46-
call_user_func($callback, $workerId);
47-
});
52+
// using $this->pool->on(Constant::EVENT_WORKER_STOP) does not work properly here.
53+
// The event gets dispatched after the Worker exited, but we need to make it exit in the first place.
54+
// pool->shutdown doesn't do that.
4855

56+
$this->onStop = $callback;
4957
return $this;
5058
}
5159

src/Queue/Broker/AMQP.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,8 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
129129

130130
public function close(): void
131131
{
132-
if ($this->channel) {
133-
$this->channel->getConnection()?->close();
134-
}
132+
$this->channel?->stopConsume();
133+
$this->channel?->getConnection()?->close();
135134
}
136135

137136
public function enqueue(Queue $queue, array $payload): bool

src/Queue/Server.php

Lines changed: 55 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Utopia\Queue;
44

55
use Exception;
6+
use Swoole\Coroutine;
67
use Throwable;
78
use Utopia\CLI\Console;
89
use Utopia\Hook;
@@ -180,6 +181,7 @@ public function shutdown(): Hook
180181
public function stop(): self
181182
{
182183
try {
184+
Console::success("[Worker] Stopping worker!");
183185
$this->adapter->stop();
184186
} catch (Throwable $error) {
185187
self::setResource('error', fn () => $error);
@@ -217,45 +219,19 @@ public function start(): self
217219
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
218220
}
219221

220-
while (true) {
221-
$this->adapter->consumer->consume(
222-
$this->adapter->queue,
223-
function (Message $message) {
224-
$receivedAtTimestamp = microtime(true);
225-
Console::info("[Job] Received Job ({$message->getPid()}).");
226-
try {
227-
$waitDuration = microtime(true) - $message->getTimestamp();
228-
$this->jobWaitTime->record($waitDuration);
229-
230-
$this->resources = [];
231-
self::setResource('message', fn () => $message);
232-
if ($this->job->getHook()) {
233-
foreach ($this->initHooks as $hook) { // Global init hooks
234-
if (in_array('*', $hook->getGroups())) {
235-
$arguments = $this->getArguments($hook, $message->getPayload());
236-
\call_user_func_array($hook->getAction(), $arguments);
237-
}
238-
}
239-
}
240-
241-
foreach ($this->job->getGroups() as $group) {
242-
foreach ($this->initHooks as $hook) { // Group init hooks
243-
if (in_array($group, $hook->getGroups())) {
244-
$arguments = $this->getArguments($hook, $message->getPayload());
245-
\call_user_func_array($hook->getAction(), $arguments);
246-
}
247-
}
248-
}
249-
250-
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
251-
} finally {
252-
$processDuration = microtime(true) - $receivedAtTimestamp;
253-
$this->processDuration->record($processDuration);
254-
}
255-
},
256-
function (Message $message) {
222+
$this->adapter->consumer->consume(
223+
$this->adapter->queue,
224+
function (Message $message) {
225+
$receivedAtTimestamp = microtime(true);
226+
Console::info("[Job] Received Job ({$message->getPid()}).");
227+
try {
228+
$waitDuration = microtime(true) - $message->getTimestamp();
229+
$this->jobWaitTime->record($waitDuration);
230+
231+
$this->resources = [];
232+
self::setResource('message', fn () => $message);
257233
if ($this->job->getHook()) {
258-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
234+
foreach ($this->initHooks as $hook) { // Global init hooks
259235
if (in_array('*', $hook->getGroups())) {
260236
$arguments = $this->getArguments($hook, $message->getPayload());
261237
\call_user_func_array($hook->getAction(), $arguments);
@@ -264,29 +240,58 @@ function (Message $message) {
264240
}
265241

266242
foreach ($this->job->getGroups() as $group) {
267-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
243+
foreach ($this->initHooks as $hook) { // Group init hooks
268244
if (in_array($group, $hook->getGroups())) {
269245
$arguments = $this->getArguments($hook, $message->getPayload());
270246
\call_user_func_array($hook->getAction(), $arguments);
271247
}
272248
}
273249
}
274-
Console::success("[Job] ({$message->getPid()}) successfully run.");
275-
},
276-
function (?Message $message, Throwable $th) {
277-
Console::error("[Job] ({$message?->getPid()}) failed to run.");
278-
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
279250

280-
self::setResource('error', fn () => $th);
251+
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
252+
} finally {
253+
$processDuration = microtime(true) - $receivedAtTimestamp;
254+
$this->processDuration->record($processDuration);
255+
}
256+
},
257+
function (Message $message) {
258+
if ($this->job->getHook()) {
259+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
260+
if (in_array('*', $hook->getGroups())) {
261+
$arguments = $this->getArguments($hook, $message->getPayload());
262+
\call_user_func_array($hook->getAction(), $arguments);
263+
}
264+
}
265+
}
281266

282-
foreach ($this->errorHooks as $hook) {
283-
($hook->getAction())(...$this->getArguments($hook));
267+
foreach ($this->job->getGroups() as $group) {
268+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
269+
if (in_array($group, $hook->getGroups())) {
270+
$arguments = $this->getArguments($hook, $message->getPayload());
271+
\call_user_func_array($hook->getAction(), $arguments);
272+
}
284273
}
285-
},
286-
);
287-
}
274+
}
275+
Console::success("[Job] ({$message->getPid()}) successfully run.");
276+
},
277+
function (?Message $message, Throwable $th) {
278+
Console::error("[Job] ({$message?->getPid()}) failed to run.");
279+
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
280+
281+
self::setResource('error', fn () => $th);
282+
283+
foreach ($this->errorHooks as $hook) {
284+
($hook->getAction())(...$this->getArguments($hook));
285+
}
286+
},
287+
);
288+
289+
// Sleep a bit before shutdown, this will prevent excessive retries on connection errors.
290+
Coroutine::sleep(5);
288291
});
289292

293+
$this->adapter->workerStop(fn () => $this->adapter->consumer->close());
294+
290295
$this->adapter->start();
291296
} catch (Throwable $error) {
292297
self::setResource('error', fn () => $error);
@@ -318,31 +323,6 @@ public function getWorkerStart(): Hook
318323
return $this->workerStartHook;
319324
}
320325

321-
/**
322-
* Is called when a Worker stops.
323-
* @param callable|null $callback
324-
* @return self
325-
* @throws Exception
326-
*/
327-
public function workerStop(?callable $callback = null): self
328-
{
329-
try {
330-
$this->adapter->workerStop(function (string $workerId) use ($callback) {
331-
Console::success("[Worker] Worker {$workerId} is ready!");
332-
if (!is_null($callback)) {
333-
call_user_func($callback);
334-
}
335-
});
336-
} catch (Throwable $error) {
337-
self::setResource('error', fn () => $error);
338-
foreach ($this->errorHooks as $hook) {
339-
call_user_func_array($hook->getAction(), $this->getArguments($hook));
340-
}
341-
}
342-
343-
return $this;
344-
}
345-
346326
/**
347327
* Get Arguments
348328
*

0 commit comments

Comments
 (0)