Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

[
'channel1' => new SynchronousAdapter(),
'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
'channel3' => [
'channel2' => new SynchronousAdapter(), // a second instance for a different queue processing pipeline
'channel3' => [ // use a yiisoft/factory syntax for adapter creation
'class' => SynchronousAdapter::class,
'__constructor' => ['channel' => 'channel3'],
],
Comment on lines 186 to 191
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The adapter-definition example still uses channel1/channel2/channel3 keys and instantiates SynchronousAdapter without required constructor dependencies. Since this PR switches terminology to queue names and removes adapter channel configuration from the interface, this snippet should be updated to reflect the new API (queue names) and a realistic adapter construction/definition example.

Copilot uses AI. Check for mistakes.
]
```
Expand Down
6 changes: 3 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
'definitions' => $params['yiisoft/queue']['queues'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
Expand Down Expand Up @@ -61,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
];
4 changes: 2 additions & 2 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channels' => [
QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class,
'queues' => [
QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
5 changes: 0 additions & 5 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Yiisoft\Queue\Adapter;

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand Down Expand Up @@ -38,8 +37,4 @@ public function push(MessageInterface $message): MessageInterface;
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages.
*/
public function subscribe(callable $handlerCallback): void;

public function withChannel(string|BackedEnum $channel): self;

public function getChannel(): string;
}
29 changes: 1 addition & 28 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

namespace Yiisoft\Queue\Adapter;

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\ChannelNormalizer;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,15 +17,11 @@ final class SynchronousAdapter implements AdapterInterface
{
private array $messages = [];
private int $current = 0;
private string $channel;

public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL,
) {
$this->channel = ChannelNormalizer::normalize($channel);
}
) {}

public function __destruct()
{
Expand Down Expand Up @@ -80,24 +73,4 @@ public function subscribe(callable $handlerCallback): void
{
$this->runExisting($handlerCallback);
}

public function withChannel(string|BackedEnum $channel): self
{
$channel = ChannelNormalizer::normalize($channel);

if ($channel === $this->channel) {
return $this;
}

$new = clone $this;
$new->channel = $channel;
$new->messages = [];

return $new;
}

public function getChannel(): string
{
return $this->channel;
}
}
18 changes: 0 additions & 18 deletions src/ChannelNormalizer.php

This file was deleted.

20 changes: 10 additions & 10 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}
Expand All @@ -36,47 +36,47 @@
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
'Queue name list to connect to',
$this->queues,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
'Pause between queue iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. '
'Maximum number of messages to process in each queue before switching to another queue. '
. 'Default is 0 (no limits).',
0,
);

$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
$this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueProvider->get($channel);
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$queues[] = $this->queueProvider->get($queue);
}

$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {

Check warning on line 72 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LessThanNegotiation": @@ @@ } $pauseSeconds = (int) $input->getOption('pause'); - if ($pauseSeconds < 0) { + if ($pauseSeconds >= 0) { $pauseSeconds = 1; }

Check warning on line 72 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LessThan": @@ @@ } $pauseSeconds = (int) $input->getOption('pause'); - if ($pauseSeconds < 0) { + if ($pauseSeconds <= 0) { $pauseSeconds = 1; }
$pauseSeconds = 1;
}

while ($this->loop->canContinue()) {
$hasMessages = false;

Check warning on line 77 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": @@ @@ } while ($this->loop->canContinue()) { - $hasMessages = false; + $hasMessages = true; foreach ($queues as $queue) { $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; }
foreach ($queues as $queue) {
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;

Check warning on line 79 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "GreaterThan": @@ @@ while ($this->loop->canContinue()) { $hasMessages = false; foreach ($queues as $queue) { - $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; + $hasMessages = $queue->run((int) $input->getOption('maximum')) >= 0 || $hasMessages; } if (!$hasMessages) {
}

if (!$hasMessages) {
Expand Down
8 changes: 4 additions & 4 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueProviderInterface::DEFAULT_CHANNEL,
'Queue name to connect to',
QueueProviderInterface::DEFAULT_QUEUE,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueProvider
->get($input->getArgument('channel'))
->get($input->getArgument('queue'))
->listen();

return 0;
Expand Down
20 changes: 10 additions & 10 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,36 @@ final class RunCommand extends Command
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}

public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to.',
$this->channels,
'Queue name list to connect to.',
$this->queues,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
'Maximum number of messages to process in each queue. Default is 0 (no limits).',
0,
)
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
->addUsage('[queue1 [queue2 [...]]] --maximum 100');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$output->write("Processing queue $queue... ");
$count = $this->queueProvider
->get($channel)
->get($queue)
->run((int) $input->getOption('maximum'));

$output->writeln("Messages processed: $count.");
Expand Down
10 changes: 5 additions & 5 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ public function collectStatus(string $id, JobStatus $status): void
}

public function collectPush(
?string $channel,
?string $queueName,
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): void {
if (!$this->isActive()) {
return;
}
if ($channel === null) {
$channel = 'null';
if ($queueName === null) {
$queueName = 'null';
}

$this->pushes[$channel][] = [
$this->pushes[$queueName][] = [
'message' => $message,
'middlewares' => $middlewareDefinitions,
];
Expand All @@ -69,7 +69,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac
if (!$this->isActive()) {
return;
}
$this->processingMessages[$queue->getChannel()][] = $message;
$this->processingMessages[$queue->getName()][] = $message;
}

public function getSummary(): array
Expand Down
11 changes: 6 additions & 5 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Yiisoft\Queue\Debug;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand All @@ -30,7 +31,7 @@ public function push(
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannel(), $message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getName(), $message, ...$middlewareDefinitions);
return $message;
}

Expand All @@ -44,13 +45,13 @@ public function listen(): void
$this->queue->listen();
}

public function withAdapter(AdapterInterface $adapter): static
public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static
{
return new self($this->queue->withAdapter($adapter), $this->collector);
return new self($this->queue->withAdapter($adapter, $queueName), $this->collector);
}

public function getChannel(): string
public function getName(): string
{
return $this->queue->getChannel();
return $this->queue->getName();
}
}
9 changes: 5 additions & 4 deletions src/Debug/QueueProviderInterfaceProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ public function __construct(
private readonly QueueCollector $collector,
) {}

public function get(string|BackedEnum $channel): QueueInterface
public function get(string|BackedEnum $queueName): QueueInterface
{
$queue = $this->queueProvider->get($channel);
$queue = $this->queueProvider->get($queueName);

return new QueueDecorator($queue, $this->collector);
}

public function has(string|BackedEnum $channel): bool
public function has(string|BackedEnum $queueName): bool
{
return $this->queueProvider->has($channel);
return $this->queueProvider->has($queueName);
}
}
15 changes: 7 additions & 8 deletions src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ public function dispatch(
FailureHandlingRequest $request,
MessageFailureHandlerInterface $finishHandler,
): FailureHandlingRequest {
/** @var string $channel It is always string in this context */
$channel = $request->getQueue()->getChannel();
if (!isset($this->middlewareDefinitions[$channel]) || $this->middlewareDefinitions[$channel] === []) {
$channel = self::DEFAULT_PIPELINE;
$queueName = $request->getQueue()->getName();
if (!isset($this->middlewareDefinitions[$queueName]) || $this->middlewareDefinitions[$queueName] === []) {
$queueName = self::DEFAULT_PIPELINE;
}
$definitions = array_reverse($this->middlewareDefinitions[$channel]);
$definitions = array_reverse($this->middlewareDefinitions[$queueName]);

if (!isset($this->stack[$channel])) {
$this->stack[$channel] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
if (!isset($this->stack[$queueName])) {
$this->stack[$queueName] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
}

return $this->stack[$channel]->handleFailure($request);
return $this->stack[$queueName]->handleFailure($request);
}

/**
Expand Down
Loading
Loading