|
4 | 4 |
|
5 | 5 | import asyncio |
6 | 6 | import time |
7 | | -from typing import Callable, Dict, List, Optional, Tuple, Union |
| 7 | +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union |
8 | 8 |
|
9 | 9 | from hip4._events_core import ( |
10 | 10 | CATEGORIES, |
| 11 | + apply_outcome_created_to_side_names, |
11 | 12 | build_events_from_meta, |
12 | 13 | extract_side_names, |
13 | 14 | merge_mids_into_events, |
14 | 15 | ) |
15 | 16 | from hip4.aio.client import AsyncHIP4Client |
16 | 17 | from hip4.market_classification import classify_all_outcomes |
17 | 18 | from hip4.types.event import PredictionCategory, PredictionEvent |
18 | | -from hip4.types.hl import HLSettledOutcome |
| 19 | +from hip4.types.hl import HLSettledOutcome, HLWsOutcomeMetaUpdate |
19 | 20 | from hip4.types.hip4_market import ( |
20 | 21 | FetchMarketsParams, |
21 | 22 | HIP4Market, |
|
27 | 28 | __all__ = ["AsyncEventsAdapter", "SideNameResolver"] |
28 | 29 |
|
29 | 30 | SideNameResolver = Callable[[int], Optional[Tuple[str, str]]] |
30 | | -"""Resolve side names for an outcome by ID (sync — pure data lookup).""" |
| 31 | +"""Resolve side names for an outcome by ID (sync - pure data lookup).""" |
31 | 32 |
|
32 | 33 | CACHE_TTL_S = 30.0 |
33 | 34 |
|
@@ -115,6 +116,39 @@ async def fetch_markets( |
115 | 116 | async def fetch_settled_outcome(self, outcome_id: int) -> Optional[HLSettledOutcome]: |
116 | 117 | return await self._client.fetch_settled_outcome(outcome_id) |
117 | 118 |
|
| 119 | + # -- subscriptions ----------------------------------------------------- |
| 120 | + |
| 121 | + async def subscribe_outcome_meta_updates( |
| 122 | + self, |
| 123 | + on_data: Callable[[HLWsOutcomeMetaUpdate], Any], |
| 124 | + ) -> Callable[[], Awaitable[None]]: |
| 125 | + """Subscribe to live outcome-meta updates. See sync equivalent for details. |
| 126 | +
|
| 127 | + ``on_data`` may be sync or a coroutine; coroutines are scheduled with |
| 128 | + ``asyncio.create_task``. |
| 129 | + """ |
| 130 | + import asyncio # local import keeps cold-start lazy |
| 131 | + |
| 132 | + def _route(raw: Any) -> None: |
| 133 | + if not isinstance(raw, list): |
| 134 | + return |
| 135 | + for update in raw: |
| 136 | + if not isinstance(update, dict): |
| 137 | + continue |
| 138 | + self._apply_meta_update(update) |
| 139 | + result = on_data(update) |
| 140 | + if asyncio.iscoroutine(result): |
| 141 | + asyncio.create_task(result) |
| 142 | + |
| 143 | + return await self._client.subscribe({"type": "outcomeMetaUpdates"}, _route) |
| 144 | + |
| 145 | + def _apply_meta_update(self, update: HLWsOutcomeMetaUpdate) -> None: |
| 146 | + if "outcomeCreated" in update: |
| 147 | + apply_outcome_created_to_side_names(self._side_names, update["outcomeCreated"]) |
| 148 | + # Drop time-based caches so the next read refetches. |
| 149 | + self._events_cache = None |
| 150 | + self._meta_cache = None |
| 151 | + |
118 | 152 | # -- internals --------------------------------------------------------- |
119 | 153 |
|
120 | 154 | async def _load_events(self) -> List[PredictionEvent]: |
|
0 commit comments