Skip to content

Commit d62ecb0

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit d62ecb0

File tree

7 files changed

+335
-238
lines changed

7 files changed

+335
-238
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@
33
namespace Utopia\Queue\Adapter;
44

55
use Swoole\Constant;
6+
use Swoole\Process;
67
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
8+
use Utopia\Console;
89
use Utopia\Queue\Adapter;
910
use Utopia\Queue\Consumer;
1011

1112
class Swoole extends Adapter
1213
{
1314
protected Pool $pool;
1415

15-
/** @var callable */
16-
private $onStop;
17-
18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
16+
public function __construct(
17+
Consumer $consumer,
18+
int $workerNum,
19+
string $queue,
20+
string $namespace = "utopia-queue",
21+
) {
2022
parent::__construct($workerNum, $queue, $namespace);
2123

2224
$this->consumer = $consumer;
@@ -25,36 +27,29 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2527

2628
public function start(): self
2729
{
28-
$this->pool->set(['enable_coroutine' => true]);
29-
30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
36-
37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
30+
$this->pool->set(["enable_coroutine" => true]);
31+
32+
// Register signal handlers for master process
33+
Process::signal(SIGTERM, function () {
34+
Console::info(
35+
"[Swoole] Master received SIGTERM, shutting down pool...",
36+
);
37+
$this->stop();
38+
});
4139

42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
40+
Process::signal(SIGINT, function () {
41+
Console::info(
42+
"[Swoole] Master received SIGINT, shutting down pool...",
43+
);
44+
$this->stop();
45+
});
4746

4847
$this->pool->start();
4948
return $this;
5049
}
5150

5251
public function stop(): self
5352
{
54-
if ($this->onStop) {
55-
call_user_func($this->onStop);
56-
}
57-
5853
Console::info("[Swoole] Shutting down process pool...");
5954
$this->pool->shutdown();
6055
Console::success("[Swoole] Process pool stopped.");
@@ -63,33 +58,38 @@ public function stop(): self
6358

6459
public function workerStart(callable $callback): self
6560
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
61+
$this->pool->on(Constant::EVENT_WORKER_START, function (
62+
Pool $pool,
63+
string $workerId,
64+
) use ($callback) {
65+
// Register signal handlers in worker to gracefully stop consume loop
66+
Process::signal(SIGTERM, function () use ($workerId) {
67+
Console::info(
68+
"[Swoole] Worker {$workerId} received SIGTERM, stopping consumer...",
69+
);
70+
$this->consumer->close();
71+
});
72+
73+
Process::signal(SIGINT, function () use ($workerId) {
74+
Console::info(
75+
"[Swoole] Worker {$workerId} received SIGINT, stopping consumer...",
76+
);
77+
$this->consumer->close();
78+
});
79+
80+
\call_user_func($callback, $workerId);
8381
});
8482

8583
return $this;
8684
}
8785

8886
public function workerStop(callable $callback): self
8987
{
90-
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
88+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
89+
Pool $pool,
90+
string $workerId,
91+
) use ($callback) {
92+
\call_user_func($callback, $workerId);
9393
});
9494

9595
return $this;

src/Queue/Broker/Pool.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

@@ -42,14 +46,20 @@ public function close(): void
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

0 commit comments

Comments
 (0)