Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@
$queueSettings = $this->queueProvider->getQueueSettings();
$dlxQueueSettings = $queueSettings
->withName("{$queueSettings->getName()}.dlx.$deliveryTime")
->withAutoDeletable(true)

Check warning on line 89 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "TrueValue": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(false)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());
->withArguments([
'x-dead-letter-exchange' => ['S', $exchangeSettings->getName()],
'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000],

Check warning on line 92 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 29999], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());

Check warning on line 92 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "CastInt": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', $delaySeconds * 1000 + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());

Check warning on line 92 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "Multiplication": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds / 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());

Check warning on line 92 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "IncrementInteger": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1001) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());

Check warning on line 92 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 999) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());
'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)],

Check warning on line 93 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": @@ @@ $dlxExchangeSettings = $exchangeSettings->withName("{$exchangeSettings->getName()}.dlx")->withAutoDelete(true)->withType(AMQPExchangeType::TOPIC); $deliveryTime = time() + (int) $delaySeconds; $queueSettings = $this->queueProvider->getQueueSettings(); - $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 1000)]]); + $dlxQueueSettings = $queueSettings->withName("{$queueSettings->getName()}.dlx.{$deliveryTime}")->withAutoDeletable(true)->withArguments(['x-dead-letter-exchange' => ['S', $exchangeSettings->getName()], 'x-expires' => ['I', (int) ($delaySeconds * 1000) + 30000], 'x-message-ttl' => ['I', (int) ($delaySeconds * 999)]]); $messageProperties = array_merge($this->queueProvider->getMessageProperties(), ['expiration' => (int) ($delaySeconds * 1000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $dlxQueueProvider = $this->queueProvider->withExchangeSettings($dlxExchangeSettings)->withQueueSettings($dlxQueueSettings)->withMessageProperties($messageProperties); $amqpMessage = new AMQPMessage($this->serializer->serialize($message), $dlxQueueProvider->getMessageProperties());
]);

$messageProperties = array_merge(
Expand Down Expand Up @@ -123,6 +123,14 @@
public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$qosSettings = $this->queueProvider->getQueueSettings()->getQosSettings();
if ($qosSettings !== null) {
$channel->basic_qos(
$qosSettings->getPrefetchSize(),
$qosSettings->getPrefetchCount(),
$qosSettings->isGlobal(),
);
}
$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
Expand All @@ -141,7 +149,7 @@
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
if ($consumerTag !== null) {
$channel->basic_cancel($consumerTag);

Check warning on line 152 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ } catch (Throwable $exception) { $consumerTag = $amqpMessage->getConsumerTag(); if ($consumerTag !== null) { - $channel->basic_cancel($consumerTag); + } throw $exception; }
}

throw $exception;
Expand Down
124 changes: 124 additions & 0 deletions src/Settings/QosSettings.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\AMQP\Settings;

use InvalidArgumentException;

/**
* Quality of Service settings for AMQP `basic.qos`.
*
* Instructs the broker how many messages (and how many bytes) it may push to a consumer
* before the consumer must acknowledge at least one of them. Without QoS, the broker
* dispatches all available messages to the first ready consumer, leaving other workers
* idle — a problem especially visible under high load with slow handlers.
*
* Typical production setting: `new QosSettings(prefetchCount: 1)` gives true round-robin
* distribution — each worker receives one message at a time and gets the next only after
* acknowledging the previous one.
*
* Note: RabbitMQ does not enforce `prefetchSize` (it is accepted but ignored). Only
* `prefetchCount` has a practical effect in RabbitMQ.
*
* @see https://www.rabbitmq.com/docs/consumer-prefetch RabbitMQ Consumer Prefetch
* @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos AMQP 0-9-1 basic.qos reference
*/
final class QosSettings
{
/**
* @param int $prefetchSize Maximum number of octets the broker may hold in-flight per consumer
* (i.e. sent but not yet acknowledged). The broker will not send a new
* message if doing so would exceed this byte budget.
* 0 disables the byte-level limit entirely.
* Note: RabbitMQ accepts this field but does not enforce it; only
* {@see self::$prefetchCount} has a practical effect.
* @param int $prefetchCount Maximum number of unacknowledged messages the broker may deliver
* before waiting for at least one acknowledgement.
* 0 disables the message-count limit (unlimited prefetch — the default
* AMQP behaviour, and the source of worker starvation in multi-consumer
* setups). Set to 1 for strict one-at-a-time round-robin distribution.
* @param bool $global Scope of the limits. The AMQP 0-9-1 specification defines global=true
* as connection-wide and global=false as channel-wide, but RabbitMQ
* redefines the semantics: global=false (default) applies the limit
* per consumer registered on the channel; global=true applies it to
* all consumers sharing the channel.
*
* @psalm-param non-negative-int $prefetchSize
* @psalm-param non-negative-int $prefetchCount
*
* @throws InvalidArgumentException if $prefetchSize or $prefetchCount is negative.
*
* @see https://www.rabbitmq.com/docs/consumer-prefetch RabbitMQ Consumer Prefetch
* @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos AMQP 0-9-1 basic.qos reference
*/
public function __construct(
private readonly int $prefetchSize = 0,
private readonly int $prefetchCount = 0,
private readonly bool $global = false,
) {
/**
* @psalm-suppress DocblockTypeContradiction
* @psalm-suppress InvalidCast
* Runtime guard for callers without static analysis.
*/
if ($prefetchSize < 0) {
throw new InvalidArgumentException(
"Prefetch size must be a non-negative integer, $prefetchSize given."
);
}
/**
* @psalm-suppress DocblockTypeContradiction
* @psalm-suppress InvalidCast
* Runtime guard for callers without static analysis.
*/
if ($prefetchCount < 0) {
throw new InvalidArgumentException(
"Prefetch count must be a non-negative integer, $prefetchCount given."
);
}
}

/**
* Returns the maximum number of octets the broker may hold unacknowledged per consumer.
* 0 means no byte-level limit.
*
* Note: RabbitMQ accepts this value but does not enforce it.
*
* @psalm-return non-negative-int
*
* @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos.prefetch-size AMQP 0-9-1 prefetch-size field
*/
public function getPrefetchSize(): int
{
return $this->prefetchSize;
}

/**
* Returns the maximum number of unacknowledged messages the broker may deliver before waiting
* for an acknowledgement. 0 means unlimited.
*
* @psalm-return non-negative-int
*
* @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos.prefetch-count AMQP 0-9-1 prefetch-count field
*/
public function getPrefetchCount(): int
{
return $this->prefetchCount;
}

/**
* Returns whether the QoS limits apply globally to the channel (true) or per consumer (false).
*
* RabbitMQ redefines the AMQP 0-9-1 semantics of this field: false means the limit applies
* to each individual consumer registered on the channel; true means the limit is shared across
* all consumers on the channel.
*
* @see https://www.rabbitmq.com/docs/consumer-prefetch#per-channel-vs-per-consumer RabbitMQ per-channel vs per-consumer prefetch
* @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos.global AMQP 0-9-1 global field
*/
public function isGlobal(): bool
{
return $this->global;
}
}
16 changes: 15 additions & 1 deletion src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public function __construct(
private bool $autoDelete = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
private ?int $ticket = null
private ?int $ticket = null,
private ?QosSettings $qosSettings = null,
) {
}

Expand Down Expand Up @@ -61,6 +62,11 @@ public function isPassive(): bool
return $this->passive;
}

public function getQosSettings(): ?QosSettings
{
return $this->qosSettings;
}

/**
* @psalm-return array{0: string, 1: bool, 2: bool, 3: bool, 4: bool, 5: bool, 6: AMQPTable|array, 7: int|null}
*
Expand Down Expand Up @@ -143,4 +149,12 @@ public function withPassive(bool $passive): self

return $new;
}

public function withQosSettings(?QosSettings $qosSettings): self
{
$new = clone $this;
$new->qosSettings = $qosSettings;

return $new;
}
}
4 changes: 4 additions & 0 deletions src/Settings/QueueSettingsInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public function hasNowait(): bool;

public function isPassive(): bool;

public function getQosSettings(): ?QosSettings;

/**
* Returns positional arguments to be used with {@see \PhpAmqpLib\Channel\AMQPChannel::queue_declare()}
*
Expand All @@ -50,4 +52,6 @@ public function withExclusive(bool $exclusive): self;
public function withNowait(bool $nowait): self;

public function withPassive(bool $passive): self;

public function withQosSettings(?QosSettings $qosSettings): self;
}
46 changes: 46 additions & 0 deletions tests/Unit/QosSettingsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\AMQP\Tests\Unit;

use InvalidArgumentException;
use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\AMQP\Settings\QosSettings;

final class QosSettingsTest extends TestCase
{
public function testDefaultValues(): void
{
$settings = new QosSettings();

self::assertSame(0, $settings->getPrefetchSize());
self::assertSame(0, $settings->getPrefetchCount());
self::assertFalse($settings->isGlobal());
}

public function testCustomValues(): void
{
$settings = new QosSettings(prefetchSize: 1024, prefetchCount: 5, global: true);

self::assertSame(1024, $settings->getPrefetchSize());
self::assertSame(5, $settings->getPrefetchCount());
self::assertTrue($settings->isGlobal());
}

public function testNegativePrefetchSizeThrows(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Prefetch size must be a non-negative integer, -1 given.');

new QosSettings(prefetchSize: -1);
}

public function testNegativePrefetchCountThrows(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Prefetch count must be a non-negative integer, -3 given.');

new QosSettings(prefetchCount: -3);
}
}
3 changes: 3 additions & 0 deletions tests/Unit/QueueSettingsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
use Yiisoft\Queue\AMQP\Settings\QosSettings;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;

Expand Down Expand Up @@ -139,5 +140,7 @@ public function testImmutable(): void
self::assertNotSame($queueSettings, $queueSettings->withName('test'));
self::assertNotSame($queueSettings, $queueSettings->withAutoDeletable(false));
self::assertNotSame($queueSettings, $queueSettings->withArguments([]));
self::assertNotSame($queueSettings, $queueSettings->withQosSettings(new QosSettings()));
self::assertNotSame($queueSettings, $queueSettings->withQosSettings(null));
}
}
80 changes: 80 additions & 0 deletions tests/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
namespace Yiisoft\Queue\AMQP\Tests\Unit;

use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
use Yiisoft\Queue\AMQP\Settings\QosSettings;
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
use Yiisoft\Queue\AMQP\Tests\Support\FileHelper;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Exception\MessageFailureException;
Expand Down Expand Up @@ -135,4 +138,81 @@ public function testImmutable(): void

self::assertNotSame($adapter, $adapter->withQueueProvider($queueProvider));
}

public function testSubscribeCallsBasicQosWhenConfigured(): void
{
$qosSettings = new QosSettings(prefetchSize: 0, prefetchCount: 5, global: true);

$queueSettings = $this->createMock(QueueSettingsInterface::class);
$queueSettings->method('getQosSettings')->willReturn($qosSettings);
$queueSettings->method('getName')->willReturn('test-queue');

$channel = $this->createMock(AMQPChannel::class);
$channel->expects(self::once())
->method('basic_qos')
->with(0, 5, true);
$channel->expects(self::once())
->method('basic_consume')
->with('test-queue', 'test-queue', false, false, false, true, self::anything());

$queueProvider = $this->createMock(QueueProviderInterface::class);
$queueProvider->method('getChannel')->willReturn($channel);
$queueProvider->method('getQueueSettings')->willReturn($queueSettings);

$loop = $this->createMock(LoopInterface::class);
$loop->method('canContinue')->willReturn(false);

$adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop);
$adapter->subscribe(static fn() => null);
}

public function testSubscribeCallsBasicQosWithDefaultSettings(): void
{
$qosSettings = new QosSettings();

$queueSettings = $this->createMock(QueueSettingsInterface::class);
$queueSettings->method('getQosSettings')->willReturn($qosSettings);
$queueSettings->method('getName')->willReturn('test-queue');

$channel = $this->createMock(AMQPChannel::class);
$channel->expects(self::once())
->method('basic_qos')
->with(0, 0, false);
$channel->expects(self::once())
->method('basic_consume')
->with('test-queue', 'test-queue', false, false, false, true, self::anything());

$queueProvider = $this->createMock(QueueProviderInterface::class);
$queueProvider->method('getChannel')->willReturn($channel);
$queueProvider->method('getQueueSettings')->willReturn($queueSettings);

$loop = $this->createMock(LoopInterface::class);
$loop->method('canContinue')->willReturn(false);

$adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop);
$adapter->subscribe(static fn() => null);
}

public function testSubscribeSkipsBasicQosWhenNotConfigured(): void
{
$queueSettings = $this->createMock(QueueSettingsInterface::class);
$queueSettings->method('getQosSettings')->willReturn(null);
$queueSettings->method('getName')->willReturn('test-queue');

$channel = $this->createMock(AMQPChannel::class);
$channel->expects(self::never())->method('basic_qos');
$channel->expects(self::once())
->method('basic_consume')
->with('test-queue', 'test-queue', false, false, false, true, self::anything());

$queueProvider = $this->createMock(QueueProviderInterface::class);
$queueProvider->method('getChannel')->willReturn($channel);
$queueProvider->method('getQueueSettings')->willReturn($queueSettings);

$loop = $this->createMock(LoopInterface::class);
$loop->method('canContinue')->willReturn(false);

$adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop);
$adapter->subscribe(static fn() => null);
}
}
Loading