Skip to content

feat: Improve reusability of components in event-driven simulations #248

@toby-coleman

Description

@toby-coleman

Summary

Currently, non event-driven components cannot be reused in event-driven models. Instead new components have to be defined. This is because if we simply subclass a Component and add events to it, it will still wait (forever) to receive the inputs, which never arrive.

Example

This example demonstrates the behaviour we would like, using a subclass of FileWriter to reuse it:

from plugboard.library import FileWriter
from plugboard.component import IOController as IO, Component
from plugboard.events import Event
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess

from pydantic import BaseModel
import typing as _t

from plugboard.schemas import ConnectorSpec

class MyData(BaseModel):
    message: str


class MyEvent(Event):
    type: _t.ClassVar[str] = "my_event"
    data: MyData


class EventReaderFileWriter(FileWriter):
    io: IO = IO(input_events=[MyEvent])

    @MyEvent.handler
    async def handle_match(self, event: MyEvent):
        self.message = event.data.message

class MessageEventGenerator(Component):
    io = IO(output_events=[MyEvent])

    def __init__(self, iters: int, **kwargs: _t.Any) -> None:
        super().__init__(**kwargs)
        self._iters = iters

    async def init(self) -> None:
        self._seq = iter(range(self._iters))

    async def step(self) -> None:
        try:
            message = "Message {}".format(next(self._seq))
            event = MyEvent(source=self.name, data=MyData(message=message))
            self.io.queue_event(event)
        except StopIteration:
            await self.io.close()

components = [
    MessageEventGenerator(iters=3, name="message_event_generator"),
    EventReaderFileWriter(
        path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
    ),
]

event_connectors = AsyncioConnector.builder().build_event_connectors(components)

process = LocalProcess(
    components=components,
    connectors=event_connectors,
)

async with process:
    await process.run()
# This hangs while event_reader_file_writer waits for input

The following is the non-event driven behaviour, which works:

class MessageInputGenerator(Component):
    io = IO(outputs=["message_input"])

    def __init__(self, iters: int, **kwargs: _t.Any) -> None:
        super().__init__(**kwargs)
        self._iters = iters

    async def init(self) -> None:
        self._seq = iter(range(self._iters))

    async def step(self) -> None:
        try:
            self.message_input = "Message {}".format(next(self._seq))
        except StopIteration:
            await self.io.close()

connect = lambda in_, out_: AsyncioConnector(  # (1)!
    spec=ConnectorSpec(source=in_, target=out_)
)
components = [
    MessageInputGenerator(iters=3, name="message_input_generator"),
    EventReaderFileWriter(
        path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
    ),
]
connectors = [
    connect("message_input_generator.message_input", "event_reader_file_writer.message"),
]

process = LocalProcess(
    components=components,
    connectors=connectors,
)

async with process:
    await process.run()

Alternatives

No response

Metadata

Metadata

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions