Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- "$PWD:/data/app"
working_dir: "/data/app"
command: sleep 99999
container_name: "ecotone_development"
container_name: "${PHP_CONTAINER_NAME:-ecotone_development}"
user: "${USER_PID:-1000}:${USER_PID:-1000}"
extra_hosts:
- "host.docker.internal:host-gateway"
Expand All @@ -32,7 +32,7 @@ services:
- "$PWD:/data/app"
working_dir: "/data/app"
command: sleep 99999
container_name: "ecotone_development_8_2"
container_name: "${PHP_8_2_CONTAINER_NAME:-ecotone_development_8_2}"
user: "${USER_PID:-1000}:${USER_PID:-1000}"
extra_hosts:
- "host.docker.internal:host-gateway"
Expand Down Expand Up @@ -73,16 +73,16 @@ services:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- "15672:15672"
- "5672:5672"
- '${RABBITMQ_PORT:-5672}:5672'
- '${RABBITMQ_MGMT_PORT:-15672}:15672'
localstack:
image: localstack/localstack:3.0.0
environment:
LOCALSTACK_HOST: 'localstack'
SERVICES: 'sqs,sns'
ports:
- "4566:4566" # LocalStack Gateway
- "4510-4559:4510-4559" # external services port range
- "${LOCALSTACK_PORT:-4566}:4566" # LocalStack Gateway
# - "4510-4559:4510-4559" # external services port range
redis:
image: redis:7-alpine
ports:
Expand All @@ -96,33 +96,33 @@ services:
- ./.docker/collector/otel-collector-config.yaml:/etc/otel-collector-config.yml
ports:
- "9411" # Zipkin receiver
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP/HTTP receiver
# - "4317:4317" # OTLP gRPC receiver
# - "4318:4318" # OTLP/HTTP receiver
zipkin:
image: openzipkin/zipkin-slim
networks:
- default
ports:
- 9411:9411
- '${ZIPKIN_PORT:-9411}:9411'
jaeger:
image: jaegertracing/all-in-one:latest
environment:
COLLECTOR_OTLP_ENABLED: "true"
networks:
- default
ports:
- 16686:16686
- '${JAEGER_PORT:-16686}:16686'
kafka:
image: 'apache/kafka:3.9.0'
ports:
- '9094:9092'
- '${KAFKA_PORT:-9094}:9092'
environment:
- KAFKA_NODE_ID=0
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:${KAFKA_PORT:-9094}
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:${KAFKA_PORT:-9094}
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
Expand All @@ -133,7 +133,7 @@ services:
kafdrop:
image: 'obsidiandynamics/kafdrop:latest'
ports:
- '9999:9000'
- '${KAFDROP_PORT:-9999}:9000'
environment:
- KAFKA_BROKERCONNECT=kafka:9092
networks:
Expand Down
5 changes: 1 addition & 4 deletions packages/Dbal/src/EnqueueDbal/DbalContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class DbalContext implements Context
* @var array
*/
private $config;
private EcotoneClockInterface $clock;

/**
* Callable must return instance of Doctrine\DBAL\Connection once called.
Expand All @@ -63,8 +62,6 @@ public function __construct($connection, array $config = [])
} else {
throw new InvalidArgumentException(sprintf('The connection argument must be either %s or callable that returns %s.', Connection::class, Connection::class));
}

$this->clock = Clock::get();
}

/**
Expand Down Expand Up @@ -257,6 +254,6 @@ public function createDataBaseTable(): void

public function getClock(): EcotoneClockInterface
{
return $this->clock;
return Clock::get();
}
}
78 changes: 73 additions & 5 deletions packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\Duration;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
use Ecotone\Messaging\Scheduling\StubUTCClock;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Test\ClockSensitiveTrait;
use Ecotone\Test\StubLogger;
Expand All @@ -35,8 +37,6 @@
*/
class DbalBackedMessageChannelTest extends DbalMessagingTestCase
{
use ClockSensitiveTrait;

public function test_sending_and_receiving_via_channel()
{
$channelName = Uuid::uuid4()->toString();
Expand Down Expand Up @@ -196,7 +196,7 @@ public function test_reconnecting_on_disconnected_channel_with_manager_registry(
$this->assertNotNull($receivedMessage, 'Not received message');
}

public function test_delaying_the_message()
public function test_delaying_the_message_with_custom_clock()
{
$channelName = Uuid::uuid4()->toString();
$clock = new StubUTCClock();
Expand All @@ -217,8 +217,6 @@ public function test_delaying_the_message()
/** @var PollableChannel $messageChannel */
$messageChannel = $ecotoneLite->getMessageChannel($channelName);

Clock::set($clock);

$messageChannel->send(
MessageBuilder::withPayload('some')
->setHeader(MessageHeaders::DELIVERY_DELAY, 2000)
Expand All @@ -232,6 +230,76 @@ public function test_delaying_the_message()
$this->assertNotNull($messageChannel->receive());
}

public function test_delaying_the_message_with_native_clock()
{
$channelName = Uuid::uuid4()->toString();

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: [
DbalConnectionFactory::class => $this->getConnectionFactory(true),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE]))
->withExtensionObjects([
DbalBackedMessageChannelBuilder::create($channelName)
->withReceiveTimeout(1),
])
);

/** @var PollableChannel $messageChannel */
$messageChannel = $ecotoneLite->getMessageChannel($channelName);

$messageChannel->send(
MessageBuilder::withPayload('some')
->setHeader(MessageHeaders::DELIVERY_DELAY, 2000)
->build()
);

$ecotoneLite->waitTill(TimeSpan::withSeconds(1));

$this->assertNull($messageChannel->receive());

$ecotoneLite->waitTill(TimeSpan::withSeconds(3));

$this->assertNotNull($messageChannel->receive());
}

public function test_delaying_the_message_with_native_clock_using_date_time()
{
$channelName = Uuid::uuid4()->toString();

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: [
DbalConnectionFactory::class => $this->getConnectionFactory(true),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE]))
->withExtensionObjects([
DbalBackedMessageChannelBuilder::create($channelName)
->withReceiveTimeout(1),
])
);

/** @var PollableChannel $messageChannel */
$messageChannel = $ecotoneLite->getMessageChannel($channelName);

$messageChannel->send(
MessageBuilder::withPayload('some')
->setHeader(MessageHeaders::DELIVERY_DELAY, 2000)
->build()
);

/** @var EcotoneClockInterface $clock */
$clock = $ecotoneLite->getServiceFromContainer(EcotoneClockInterface::class);
$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(1)));

$this->assertNull($messageChannel->receive());

$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(3)));

$this->assertNotNull($messageChannel->receive());
}

public function test_sending_message()
{
$queueName = Uuid::uuid4()->toString();
Expand Down
1 change: 1 addition & 0 deletions packages/Ecotone/src/Lite/EcotoneLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\ConfigurationVariableService;
use Ecotone\Messaging\InMemoryConfigurationVariableService;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\BaseEventSourcingConfiguration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ final class DelayedMessageReleaseHandler
{
public function releaseMessagesAwaitingFor(string $channelName, int|TimeSpan|DateTimeInterface $timeInMillisecondsOrDateTime, ChannelResolver $channelResolver): void
{
if (!$channelResolver->hasChannelWithName($channelName)) {
return;
}

/** @var DelayableQueueChannel|MessageChannelInterceptorAdapter $channel */
$channel = $channelResolver->resolve($channelName);
if ($channel instanceof MessageChannelInterceptorAdapter) {
$channel = $channel->getInternalMessageChannel();
}

Assert::isTrue($channel instanceof DelayableQueueChannel, sprintf('Used %s channel to release delayed message, use instead of %s.', $channel::class, DelayableQueueChannel::class));
if (! $channel instanceof DelayableQueueChannel) {
return;
}

$channel->releaseMessagesAwaitingFor($timeInMillisecondsOrDateTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\MessagePublisher;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\DistributedBus;
Expand Down Expand Up @@ -89,6 +91,7 @@ public function getFlowTestSupport(): FlowTestSupport
$this->getServiceFromContainer(AggregateDefinitionRegistry::class),
$this->getMessagingTestSupport(),
$this->getGatewayByName(MessagingEntrypoint::class),
$this->getServiceFromContainer(EcotoneClockInterface::class),
$this->configuredMessagingSystem
);
}
Expand Down
24 changes: 21 additions & 3 deletions packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\Duration;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\MessageBuilder;
Expand Down Expand Up @@ -46,6 +49,7 @@ public function __construct(
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
private MessagingTestSupport $testSupportGateway,
private MessagingEntrypoint $messagingEntrypoint,
private EcotoneClockInterface $clock,
private ConfiguredMessagingSystem $configuredMessagingSystem
) {
}
Expand Down Expand Up @@ -141,9 +145,7 @@ public function receiveMessageFrom(string $channelName): ?Message
*/
public function run(string $name, ?ExecutionPollingMetadata $executionPollingMetadata = null, TimeSpan|DateTimeInterface|null $releaseAwaitingFor = null): self
{
if ($releaseAwaitingFor) {
$this->testSupportGateway->releaseMessagesAwaitingFor($name, $releaseAwaitingFor);
}
$this->testSupportGateway->releaseMessagesAwaitingFor($name, $releaseAwaitingFor ?? Clock::get()->now());
$this->configuredMessagingSystem->run($name, $executionPollingMetadata);

return $this;
Expand Down Expand Up @@ -190,6 +192,22 @@ public function getEventStreamEvents(string $streamName): array
return $this->getGateway(EventStore::class)->load($streamName);
}

public function waitTill(TimeSpan|DateTimeInterface $time): self
{
if ($time instanceof DateTimeInterface) {
if ($time < $this->clock->now()) {
throw new MessagingException("Time to wait is in the past. Now: {$this->clock->now()}, time to wait: {$time}");
}
}

$this->clock->sleep($time instanceof TimeSpan
? $time->toDuration()
: Timespan::fromDateInterval($time->diff($this->clock->now()))->toDuration()
);

return $this;
}

/**
* @param Event[]|object[]|array[] $events
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\Container\ReferenceSearchServiceWithContainer;
use Ecotone\Messaging\Config\MessagingSystemContainer;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceCacheConfiguration;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Handler\Bridge\Bridge;
use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Handler\Enricher\PropertyEditorAccessor;
Expand All @@ -30,11 +32,23 @@
*/
class RegisterSingletonMessagingServices implements CompilerPass
{
public function __construct(
private ServiceConfiguration $serviceConfiguration,
) {
}

public function process(ContainerBuilder $builder): void
{
$this->registerDefault($builder, Bridge::class, new Definition(Bridge::class));
$this->registerDefault($builder, Reference::toChannel(NullableMessageChannel::CHANNEL_NAME), new Definition(NullableMessageChannel::class));
$this->registerDefault($builder, EcotoneClockInterface::class, new Definition(Clock::class, [new Reference(ClockInterface::class, ContainerImplementation::NULL_ON_INVALID_REFERENCE)]));
$this->registerDefault($builder, EcotoneClockInterface::class, new Definition(
Clock::class,
[
new Reference(ClockInterface::class, ContainerImplementation::NULL_ON_INVALID_REFERENCE),
$this->serviceConfiguration->isModulePackageEnabled(ModulePackageList::TEST_PACKAGE),
],
factory: [Clock::class, 'createBasedOnConfig']
));
$this->registerDefault($builder, ChannelResolver::class, new Definition(ChannelResolverWithContainer::class, [new Reference(ContainerInterface::class)]));
$this->registerDefault($builder, ReferenceSearchService::class, new Definition(ReferenceSearchServiceWithContainer::class, [new Reference(ContainerInterface::class)]));
$this->registerDefault($builder, ExpressionEvaluationService::REFERENCE, new Definition(SymfonyExpressionEvaluationAdapter::class, [new Reference(ReferenceSearchService::class)], 'create'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ public function process(ContainerBuilder $builder): void
}

$messagingBuilder->register(ConfiguredMessagingSystem::class, new Definition(MessagingSystemContainer::class, [new Reference(ContainerInterface::class), $messagingBuilder->getPollingEndpoints(), $gatewayListReferences]));
(new RegisterSingletonMessagingServices())->process($builder);
(new RegisterSingletonMessagingServices($this->applicationConfiguration))->process($builder);
foreach ($this->compilerPasses as $compilerPass) {
$compilerPass->process($builder);
}
Expand Down
Loading