Każdy typ źródła danych jest obsługiwany przez osobny plugin (Watcher).
System pluginów używa wzorca Registry — analogicznie do FormatRegistry w toonic core.
| Plugin |
Kategoria |
Obsługuje |
FileWatcher |
code/config/data |
Katalogi, pliki (polling + TOON) |
LogWatcher |
logs |
Pliki logów (tail -f + kategoryzacja) |
StreamWatcher |
video/audio |
RTSP, pliki video (OpenCV + scene detection) |
- Stwórz plik
toonic/server/watchers/my_watcher.py
- Dziedzicz po
BaseWatcher
- Zaimplementuj
start(), stop(), supports()
- Zarejestruj w
WatcherRegistry
from toonic.server.watchers.base import BaseWatcher, WatcherRegistry
from toonic.server.models import ContextChunk, SourceCategory
class MQTTWatcher(BaseWatcher):
category = SourceCategory.DATA
async def start(self):
await super().start()
# Connect to MQTT broker, subscribe to topics
self._task = asyncio.create_task(self._listen())
async def _listen(self):
while self.running:
message = await self._receive()
await self.emit(ContextChunk(
source_id=f"mqtt:{message.topic}",
category=SourceCategory.DATA,
toon_spec=self._to_toon(message),
is_delta=True,
))
@classmethod
def supports(cls, path_or_url: str) -> float:
return 0.9 if path_or_url.startswith("mqtt://") else 0.0
WatcherRegistry.register(MQTTWatcher)
| Metoda |
Opis |
start() |
Rozpocznij obserwację |
stop() |
Zatrzymaj i posprzątaj |
emit(chunk) |
Wyślij ContextChunk do kolejki |
get_chunks() |
AsyncIterator[ContextChunk] |
supports(path) |
0.0-1.0 confidence |
# Rozpoznanie najlepszego pluginu
cls = WatcherRegistry.resolve("rtsp://cam1") # → StreamWatcher
# Utworzenie instancji
watcher = WatcherRegistry.create("cam1", "video", "rtsp://cam1:554/stream")
# Lista pluginów
WatcherRegistry.list_all() # → ["FileWatcher", "LogWatcher", "StreamWatcher"]