Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/Kafka/src/Configuration/KafkaModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
foreach ($publisherConfigurations as $publisherConfiguration) {
$kafkaBrokerConfigurations[$publisherConfiguration->getBrokerConfigurationReference()] = Reference::to($publisherConfiguration->getBrokerConfigurationReference());
}
if ($kafkaConsumers !== [] && ! array_key_exists(KafkaBrokerConfiguration::class, $kafkaBrokerConfigurations)) {
$kafkaBrokerConfigurations[KafkaBrokerConfiguration::class] = Reference::to(KafkaBrokerConfiguration::class);
}

foreach ($this->kafkaConsumersAnnotatedMethods as $kafkaConsumerAnnotatedMethod) {
/** @var KafkaConsumer $kafkaConsumer */
Expand Down
100 changes: 100 additions & 0 deletions packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
namespace Test\Ecotone\Kafka\Integration;

use Ecotone\Kafka\Api\KafkaHeader;
use Ecotone\Kafka\Attribute\KafkaConsumer;
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration;
use Ecotone\Kafka\Configuration\KafkaPublisherConfiguration;
use Ecotone\Kafka\Configuration\TopicConfiguration;
use Ecotone\Kafka\Outbound\MessagePublishingException;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Lite\Test\FlowTestSupport;
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
Expand Down Expand Up @@ -360,4 +362,102 @@ public function test_sending_and_receiving_with_kafka_consumer_configuration():
self::assertCount(1, $messages);
self::assertEquals('exampleData', $messages[0]['payload']);
}

public function test_sending_and_receiving_without_kafka_consumer_configuration(): void
{
$topicName = Uuid::uuid4()->toString();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ExampleKafkaConsumer::class],
[
KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(),
new ExampleKafkaConsumer(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE]))
->withExtensionObjects([
KafkaPublisherConfiguration::createWithDefaults($topicName),
TopicConfiguration::createWithReferenceName('exampleTopic', $topicName),
]),
licenceKey: LicenceTesting::VALID_LICENCE,
);

/** @var MessagePublisher $kafkaPublisher */
$kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class);

$kafkaPublisher->sendWithMetadata('exampleData', 'application/text', ['key' => 'value']);

$ecotoneLite->run('exampleConsumer', ExecutionPollingMetadata::createWithTestingSetup(
maxExecutionTimeInMilliseconds: 30000
));

$messages = $ecotoneLite->sendQueryWithRouting('getMessages');

self::assertCount(1, $messages);
self::assertEquals('exampleData', $messages[0]['payload']);
}

public function test_kafka_consumer_works_without_explicit_configuration(): void
{
$topicName = 'test_topic_no_config_' . Uuid::uuid4()->toString();

$consumer = new class {
private array $messages = [];

#[KafkaConsumer('ordersConsumer', 'orders')]
public function handle(string $payload): void
{
$this->messages[] = $payload;
}

#[QueryHandler('consumer.getMessages')]
public function getMessages(): array
{
return $this->messages;
}
};

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[$consumer::class],
[
KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(),
$consumer,
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE]))
->withExtensionObjects([
TopicConfiguration::createWithReferenceName('orders', $topicName),
]),
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotoneLite->run('ordersConsumer', ExecutionPollingMetadata::createWithTestingSetup());

$messages = $ecotoneLite->sendQueryWithRouting('consumer.getMessages');

self::assertCount(0, $messages);
}

public function test_kafka_publisher_works_without_explicit_configuration(): void
{
$topicName = 'test_topic_publisher_' . Uuid::uuid4()->toString();

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[],
[
KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE]))
->withExtensionObjects([
KafkaPublisherConfiguration::createWithDefaults($topicName),
]),
licenceKey: LicenceTesting::VALID_LICENCE,
);

/** @var MessagePublisher $kafkaPublisher */
$kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class);
$kafkaPublisher->send('test-payload');

$this->assertTrue(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Doctrine\DBAL\Platforms\MariaDBPlatform;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\Projecting\PartitionProvider;
Expand All @@ -27,7 +28,7 @@ class AggregateIdPartitionProvider implements PartitionProvider
* @param array<string> $partitionedProjections List of projection names this provider handles
*/
public function __construct(
private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
private DbalConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory,
private PdoStreamTableNameProvider $tableNameProvider,
private array $partitionedProjections = [],
) {
Expand Down Expand Up @@ -117,6 +118,10 @@ private function getConnection(): Connection
return $this->connectionFactory->getConnection();
}

if ($this->connectionFactory instanceof AlreadyConnectedDbalConnectionFactory) {
return $this->connectionFactory->getConnection();
}

return $this->connectionFactory->establishConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\Database\ProjectionStateTableManager;
use Ecotone\Projecting\NoOpTransaction;
Expand All @@ -34,7 +35,7 @@ class DbalProjectionStateStorage implements ProjectionStateStorage
* @param string[]|null $projectionNames
*/
public function __construct(
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory,
private ProjectionStateTableManager $tableManager,
private ?array $projectionNames = null,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use DateTimeZone;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory;
use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
Expand All @@ -35,7 +36,7 @@ class EventStoreGlobalStreamSource implements StreamSource
* @param string[] $handledProjectionNames
*/
public function __construct(
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory,
private EcotoneClockInterface $clock,
private PdoStreamTableNameProvider $tableNameProvider,
private StreamFilterRegistry $streamFilterRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Test\Ecotone\EventSourcing\Projecting;

use Ecotone\Dbal\DbalConnection;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\Lite\EcotoneLite;
Expand Down Expand Up @@ -451,4 +452,72 @@ public function init(): void

self::assertSame(1, $projection->initCallCount, 'Init should be called once');
}

public function test_projecting_with_already_connected_dbal_connection_factory(): void
{
$dbalConnectionFactory = self::getConnectionFactory();
$connection = $dbalConnectionFactory->createContext()->getDbalConnection();
$alreadyConnectedFactory = DbalConnection::create($connection);

$projection = new #[ProjectionV2(self::NAME), FromStream(Ticket::STREAM_NAME)] class ($connection) extends DbalTicketProjection {
public const NAME = 'already_connected_projection';
};

$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
[$projection::class, Ticket::class, TicketEventConverter::class, TicketAssigned::class],
[\Enqueue\Dbal\DbalConnectionFactory::class => $alreadyConnectedFactory, $projection, new TicketEventConverter()],
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ticketsCount = $ecotone->deleteEventStream(Ticket::STREAM_NAME)
->deleteProjection($projection::NAME)
->sendCommand(new CreateTicketCommand($ticketId = Uuid::uuid4()->toString()))
->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId])
->sendQueryWithRouting('getTicketsCount');

self::assertSame(1, $ticketsCount);
self::assertSame('assigned', $ecotone->sendQueryWithRouting('getTicketStatus', $ticketId));
}

public function test_partitioned_projection_with_already_connected_dbal_connection_factory(): void
{
$dbalConnectionFactory = self::getConnectionFactory();
$connection = $dbalConnectionFactory->createContext()->getDbalConnection();
$alreadyConnectedFactory = DbalConnection::create($connection);

$projection = new #[ProjectionV2(self::NAME), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(Ticket::STREAM_NAME, Ticket::class)] class ($connection) extends DbalTicketProjection {
public const NAME = 'already_connected_partitioned_projection';
public int $initCallCount = 0;

#[ProjectionInitialization]
public function init(): void
{
parent::init();
$this->initCallCount++;
}
};

$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
[$projection::class, Ticket::class, TicketEventConverter::class, TicketAssigned::class],
[\Enqueue\Dbal\DbalConnectionFactory::class => $alreadyConnectedFactory, $projection, new TicketEventConverter()],
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotone->deleteEventStream(Ticket::STREAM_NAME)
->deleteProjection($projection::NAME);

$ticketId1 = Uuid::uuid4()->toString();
$ticketId2 = Uuid::uuid4()->toString();

$ticketsCount = $ecotone->sendCommand(new CreateTicketCommand($ticketId1))
->sendCommand(new CreateTicketCommand($ticketId2))
->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId1])
->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId2])
->sendQueryWithRouting('getTicketsCount');

self::assertSame(2, $ticketsCount);
self::assertSame(2, $projection->initCallCount);
}
}