Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.6.0"
version = "4.6.1"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down
65 changes: 63 additions & 2 deletions src/cqrs/requests/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def bootstrap(
)


@overload
def setup_streaming_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer,
Expand All @@ -241,6 +242,31 @@ def setup_streaming_mediator(
domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator: ...


@overload
def setup_streaming_mediator(
event_emitter: events.EventEmitter,
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
commands_mapper: typing.Callable[[RequestMap], None] | None = None,
queries_mapper: typing.Callable[[RequestMap], None] | None = None,
domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator: ...


def setup_streaming_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer | CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
commands_mapper: typing.Callable[[RequestMap], None] | None = None,
queries_mapper: typing.Callable[[RequestMap], None] | None = None,
domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator:
requests_mapper = RequestMap()
if commands_mapper:
Expand Down Expand Up @@ -268,6 +294,7 @@ def setup_streaming_mediator(
)


@overload
def bootstrap_streaming(
di_container: di.Container,
message_broker: protocol.MessageBroker | None = None,
Expand All @@ -278,6 +305,33 @@ def bootstrap_streaming(
on_startup: typing.List[typing.Callable[[], None]] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator: ...


@overload
def bootstrap_streaming(
di_container: CQRSContainer,
message_broker: protocol.MessageBroker | None = None,
middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
commands_mapper: typing.Callable[[RequestMap], None] | None = None,
domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
queries_mapper: typing.Callable[[RequestMap], None] | None = None,
on_startup: typing.List[typing.Callable[[], None]] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator: ...


def bootstrap_streaming(
di_container: di.Container | CQRSContainer,
message_broker: protocol.MessageBroker | None = None,
middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
commands_mapper: typing.Callable[[RequestMap], None] | None = None,
domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
queries_mapper: typing.Callable[[RequestMap], None] | None = None,
on_startup: typing.List[typing.Callable[[], None]] | None = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator:
if message_broker is None:
message_broker = DEFAULT_MESSAGE_BROKER
Expand All @@ -287,8 +341,15 @@ def bootstrap_streaming(
for fun in on_startup:
fun()

container = di_container_impl.DIContainer()
container.attach_external_container(di_container)
# If the provided container is a container implemented using di package,
# we need to wrap it into our own container
if isinstance(di_container, di.Container):
container = di_container_impl.DIContainer()
container.attach_external_container(di_container)

# Otherwise, we can use the provided container directly
else:
container = di_container

event_emitter = setup_event_emitter(
container,
Expand Down
Loading