Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

namespace Yiisoft\Queue\AMQP;

use BackedEnum;
use InvalidArgumentException;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\DelayEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\MessageStatus;

final class Adapter implements AdapterInterface
{
Expand All @@ -25,17 +26,6 @@
) {
}

public function withChannel(BackedEnum|string $channel): self
{
$instance = clone $this;

$channelName = is_string($channel) ? $channel : (string) $channel->value;
$instance->queueProvider = $this->queueProvider->withChannelName($channelName);
$instance->amqpMessage = null;

return $instance;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
*/
Expand All @@ -44,16 +34,19 @@
(new ExistingMessagesConsumer($this->queueProvider, $this->serializer))->consume($handlerCallback);
}

/**
* @return never
*/
public function status(int|string $id): JobStatus
public function status(string|int $id): MessageStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
return MessageStatus::NOT_FOUND;
}

public function push(MessageInterface $message): MessageInterface
{
$delaySeconds = $message->getMetadata()[DelayEnvelope::META_DELAY_SECONDS] ?? null;
if ($delaySeconds !== null) {
$this->pushDelayed($message, $delaySeconds);

Check failure on line 46 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedArgument

src/Adapter.php:46:42: MixedArgument: Argument 2 of Yiisoft\Queue\AMQP\Adapter::pushDelayed cannot be mixed, expecting int (see https://psalm.dev/030)

Check failure on line 46 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedArgument

src/Adapter.php:46:42: MixedArgument: Argument 2 of Yiisoft\Queue\AMQP\Adapter::pushDelayed cannot be mixed, expecting int (see https://psalm.dev/030)

Check failure on line 46 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedArgument

src/Adapter.php:46:42: MixedArgument: Argument 2 of Yiisoft\Queue\AMQP\Adapter::pushDelayed cannot be mixed, expecting int (see https://psalm.dev/030)
return $message;
}

$this->amqpMessage ??= new AMQPMessage(
'',
$this->queueProvider->getMessageProperties(),
Expand All @@ -77,6 +70,57 @@
return $message;
}

private function pushDelayed(MessageInterface $message, int $delaySeconds): void
{
$exchangeSettings = $this->queueProvider->getExchangeSettings();
if ($exchangeSettings === null) {
throw new InvalidArgumentException('Message cannot be delayed to a queue without an exchange. Exchange is mandatory.');
}

$dlxExchangeSettings = $exchangeSettings
->withName("{$exchangeSettings->getName()}.dlx")
->withAutoDelete(true)
->withType(AMQPExchangeType::TOPIC);

$deliveryTime = time() + $delaySeconds;
$delayMilliseconds = $delaySeconds * 1000;
$queueSettings = $this->queueProvider->getQueueSettings();
$dlxQueueSettings = $queueSettings
->withName("{$queueSettings->getName()}.dlx.$deliveryTime")
->withAutoDeletable(true)
->withArguments([
'x-dead-letter-exchange' => ['S', $exchangeSettings->getName()],
'x-expires' => ['I', $delayMilliseconds + 30000],
'x-message-ttl' => ['I', $delayMilliseconds],
]);

$messageProperties = array_merge(
$this->queueProvider->getMessageProperties(),
[
'expiration' => $delayMilliseconds,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
],
);

$dlxQueueProvider = $this->queueProvider
->withExchangeSettings($dlxExchangeSettings)
->withQueueSettings($dlxQueueSettings)
->withMessageProperties($messageProperties);

$amqpMessage = new AMQPMessage(
$this->serializer->serialize($message),
$dlxQueueProvider->getMessageProperties(),
);

$dlxQueueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$dlxExchangeSettings->getName(),
'',
);
Comment on lines +115 to +121
}

public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
Expand Down
114 changes: 0 additions & 114 deletions src/Middleware/DelayMiddleware.php

This file was deleted.

6 changes: 3 additions & 3 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public function getMessageProperties(): array
return $this->messageProperties;
}

public function withChannelName(string $channel): self
public function withQueueName(string $queue): self
{
if ($channel === $this->queueSettings->getName()) {
if ($queue === $this->queueSettings->getName()) {
return $this;
}

Expand All @@ -87,7 +87,7 @@ public function withChannelName(string $channel): self
}

$instance = clone $this;
$instance->queueSettings = $instance->queueSettings->withName($channel);
$instance->queueSettings = $instance->queueSettings->withName($queue);
if ($this->channelId !== null) {
$instance->channelId = null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/QueueProviderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function getExchangeSettings(): ?ExchangeSettingsInterface;

public function getMessageProperties(): array;

public function withChannelName(string $channel): self;
public function withQueueName(string $queue): self;

public function withQueueSettings(QueueSettingsInterface $queueSettings): self;

Expand Down
4 changes: 2 additions & 2 deletions src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Yiisoft\Queue\AMQP\Settings;

use PhpAmqpLib\Wire\AMQPTable;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class Queue implements QueueSettingsInterface
{
public function __construct(
private string $queueName = QueueInterface::DEFAULT_CHANNEL,
private string $queueName = QueueProviderInterface::DEFAULT_QUEUE,
private bool $passive = false,
private bool $durable = true,
private bool $exclusive = false,
Expand Down
27 changes: 10 additions & 17 deletions tests/Integration/DelayMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@
use Psr\Log\LoggerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\Middleware\DelayMiddleware;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings;
use Yiisoft\Queue\AMQP\Tests\Support\FakeAdapter;
use Yiisoft\Queue\AMQP\Tests\Support\FileHelper;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Message\DelayEnvelope;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\CallableFactory;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Test\Support\Container\SimpleContainer;
Expand Down Expand Up @@ -52,8 +51,7 @@ public function testMainFlow(): void

$time = time();
$queue->push(
new Message('simple', 'test-delay-middleware-main'),
new DelayMiddleware(3),
new DelayEnvelope(new Message('simple', 'test-delay-middleware-main'), 3),
);

sleep(2);
Expand All @@ -66,27 +64,22 @@ public function testMainFlow(): void
self::assertLessThanOrEqual($time + 5, $result);
}

public function testMainFlowWithFakeAdapter(): void
public function testNoExchangeThrows(): void
{
$adapterClass = Adapter::class;
$fakeAdapterClass = FakeAdapter::class;

$adapter = new FakeAdapter(
$adapter = new Adapter(
new QueueProvider(
$this->createConnection(),
new QueueSettings(),
new ExchangeSettings('yii-queue'),
),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage("This middleware works only with the $adapterClass. $fakeAdapterClass given.");
$this->expectExceptionMessage('Message cannot be delayed to a queue without an exchange. Exchange is mandatory.');
$queue->push(
new Message('simple', 'test-delay-middleware-main'),
new DelayMiddleware(3),
new DelayEnvelope(new Message('simple', 'test-delay-no-exchange'), 3),
);
}

Expand All @@ -96,8 +89,8 @@ private function makeQueue(AdapterInterface $adapter): Queue
$this->createMock(WorkerInterface::class),
$this->createMock(LoopInterface::class),
$this->createMock(LoggerInterface::class),
new PushMiddlewareDispatcher(
new MiddlewareFactoryPush(
new PushMiddlewareConfig(
new PushMiddlewareFactory(
new SimpleContainer(),
new CallableFactory($this->createMock(ContainerInterface::class)),
),
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected function queueListen(?string $queue = null): void
// TODO Fail test on subprocess error exit code
$command = [PHP_BINARY, dirname(__DIR__) . '/yii', 'queue:listen'];
if ($queue !== null) {
$command[] = "--channel=$queue";
$command[] = $queue;
}
$process = new Process($command);
$this->processes[] = $process;
Expand Down
4 changes: 2 additions & 2 deletions tests/Support/FakeAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

Expand All @@ -27,7 +27,7 @@ public function runExisting(callable $handlerCallback): void
throw new LogicException('Method not implemented');
}

public function status(int|string $id): JobStatus
public function status(string|int $id): MessageStatus
{
throw new LogicException('Method not implemented');
}
Expand Down
Loading
Loading