Skip to content

Commit fb87f9e

Browse files
committed
Implement ActionQueue for batching actions in OverkizClient and add integration tests
1 parent 876b54d commit fb87f9e

File tree

5 files changed

+925
-2
lines changed

5 files changed

+925
-2
lines changed

pyoverkiz/action_queue.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
"""Action queue for batching multiple action executions into single API calls."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from collections.abc import Callable, Coroutine
7+
from typing import TYPE_CHECKING
8+
9+
if TYPE_CHECKING:
10+
from pyoverkiz.enums import CommandMode
11+
from pyoverkiz.models import Action
12+
13+
14+
class QueuedExecution:
15+
"""Represents a queued action execution that will resolve to an exec_id when the batch executes."""
16+
17+
def __init__(self) -> None:
18+
self._future: asyncio.Future[str] = asyncio.Future()
19+
20+
def set_result(self, exec_id: str) -> None:
21+
"""Set the execution ID result."""
22+
if not self._future.done():
23+
self._future.set_result(exec_id)
24+
25+
def set_exception(self, exception: Exception) -> None:
26+
"""Set an exception if the batch execution failed."""
27+
if not self._future.done():
28+
self._future.set_exception(exception)
29+
30+
def __await__(self):
31+
"""Make this awaitable."""
32+
return self._future.__await__()
33+
34+
35+
class ActionQueue:
36+
"""
37+
Batches multiple action executions into single API calls.
38+
39+
When actions are added, they are held for a configurable delay period.
40+
If more actions arrive during this window, they are batched together.
41+
The batch is flushed when:
42+
- The delay timer expires
43+
- The max actions limit is reached
44+
- The command mode changes
45+
- Manual flush is requested
46+
"""
47+
48+
def __init__(
49+
self,
50+
executor: Callable[
51+
[list[Action], CommandMode | None, str | None], Coroutine[None, None, str]
52+
],
53+
delay: float = 0.5,
54+
max_actions: int = 20,
55+
) -> None:
56+
"""
57+
Initialize the action queue.
58+
59+
:param executor: Async function to execute batched actions
60+
:param delay: Seconds to wait before auto-flushing (default 0.5)
61+
:param max_actions: Maximum actions per batch before forced flush (default 20)
62+
"""
63+
self._executor = executor
64+
self._delay = delay
65+
self._max_actions = max_actions
66+
67+
self._pending_actions: list[Action] = []
68+
self._pending_mode: CommandMode | None = None
69+
self._pending_label: str | None = None
70+
self._pending_waiters: list[QueuedExecution] = []
71+
72+
self._flush_task: asyncio.Task[None] | None = None
73+
self._lock = asyncio.Lock()
74+
75+
async def add(
76+
self,
77+
actions: list[Action],
78+
mode: CommandMode | None = None,
79+
label: str | None = None,
80+
) -> QueuedExecution:
81+
"""
82+
Add actions to the queue.
83+
84+
:param actions: Actions to queue
85+
:param mode: Command mode (will flush if different from pending mode)
86+
:param label: Label for the action group
87+
:return: QueuedExecution that resolves to exec_id when batch executes
88+
"""
89+
async with self._lock:
90+
# If mode or label changes, flush existing queue first
91+
if self._pending_actions and (
92+
mode != self._pending_mode or label != self._pending_label
93+
):
94+
await self._flush_now()
95+
96+
# Add actions to pending queue
97+
self._pending_actions.extend(actions)
98+
self._pending_mode = mode
99+
self._pending_label = label
100+
101+
# Create waiter for this caller
102+
waiter = QueuedExecution()
103+
self._pending_waiters.append(waiter)
104+
105+
# If we hit max actions, flush immediately
106+
if len(self._pending_actions) >= self._max_actions:
107+
await self._flush_now()
108+
else:
109+
# Schedule delayed flush if not already scheduled
110+
if self._flush_task is None or self._flush_task.done():
111+
self._flush_task = asyncio.create_task(self._delayed_flush())
112+
113+
return waiter
114+
115+
async def _delayed_flush(self) -> None:
116+
"""Wait for the delay period, then flush the queue."""
117+
await asyncio.sleep(self._delay)
118+
async with self._lock:
119+
if not self._pending_actions:
120+
return
121+
122+
# Take snapshot and clear state while holding lock
123+
actions = self._pending_actions
124+
mode = self._pending_mode
125+
label = self._pending_label
126+
waiters = self._pending_waiters
127+
128+
self._pending_actions = []
129+
self._pending_mode = None
130+
self._pending_label = None
131+
self._pending_waiters = []
132+
self._flush_task = None
133+
134+
# Execute outside the lock
135+
try:
136+
exec_id = await self._executor(actions, mode, label)
137+
for waiter in waiters:
138+
waiter.set_result(exec_id)
139+
except Exception as exc:
140+
for waiter in waiters:
141+
waiter.set_exception(exc)
142+
143+
async def _flush_now(self) -> None:
144+
"""Execute pending actions immediately (must be called with lock held)."""
145+
if not self._pending_actions:
146+
return
147+
148+
# Cancel any pending flush task
149+
if self._flush_task and not self._flush_task.done():
150+
self._flush_task.cancel()
151+
self._flush_task = None
152+
153+
# Take snapshot of current batch
154+
actions = self._pending_actions
155+
mode = self._pending_mode
156+
label = self._pending_label
157+
waiters = self._pending_waiters
158+
159+
# Clear pending state
160+
self._pending_actions = []
161+
self._pending_mode = None
162+
self._pending_label = None
163+
self._pending_waiters = []
164+
165+
# Execute the batch (must release lock before calling executor to avoid deadlock)
166+
# Note: This is called within a lock context, we'll execute outside
167+
try:
168+
exec_id = await self._executor(actions, mode, label)
169+
# Notify all waiters
170+
for waiter in waiters:
171+
waiter.set_result(exec_id)
172+
except Exception as exc:
173+
# Propagate exception to all waiters
174+
for waiter in waiters:
175+
waiter.set_exception(exc)
176+
raise
177+
178+
async def flush(self) -> list[str]:
179+
"""
180+
Force flush all pending actions immediately.
181+
182+
:return: List of exec_ids from flushed batches
183+
"""
184+
async with self._lock:
185+
if not self._pending_actions:
186+
return []
187+
188+
# Since we can only have one batch pending at a time,
189+
# this will return a single exec_id in a list
190+
exec_ids: list[str] = []
191+
192+
try:
193+
await self._flush_now()
194+
# If flush succeeded, we can't actually return the exec_id here
195+
# since it's delivered via the waiters. This method is mainly
196+
# for forcing a flush, not retrieving results.
197+
# Return empty list to indicate flush completed
198+
except Exception:
199+
# If flush fails, the exception will be propagated to waiters
200+
# and also raised here
201+
raise
202+
203+
return exec_ids
204+
205+
def get_pending_count(self) -> int:
206+
"""Get the number of actions currently waiting in the queue."""
207+
return len(self._pending_actions)
208+
209+
async def shutdown(self) -> None:
210+
"""Shutdown the queue, flushing any pending actions."""
211+
async with self._lock:
212+
if self._flush_task and not self._flush_task.done():
213+
self._flush_task.cancel()
214+
self._flush_task = None
215+
216+
if self._pending_actions:
217+
await self._flush_now()

pyoverkiz/client.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from botocore.config import Config
2727
from warrant_lite import WarrantLite
2828

29+
from pyoverkiz.action_queue import ActionQueue, QueuedExecution
2930
from pyoverkiz.const import (
3031
COZYTOUCH_ATLANTIC_API,
3132
COZYTOUCH_CLIENT_ID,
@@ -166,6 +167,7 @@ class OverkizClient:
166167
_expires_in: datetime.datetime | None = None
167168
_access_token: str | None = None
168169
_ssl: ssl.SSLContext | bool = True
170+
_action_queue: ActionQueue | None = None
169171

170172
def __init__(
171173
self,
@@ -175,13 +177,21 @@ def __init__(
175177
verify_ssl: bool = True,
176178
token: str | None = None,
177179
session: ClientSession | None = None,
180+
action_queue_enabled: bool = False,
181+
action_queue_delay: float = 0.5,
182+
action_queue_max_actions: int = 20,
178183
) -> None:
179184
"""Constructor.
180185
181186
:param username: the username
182187
:param password: the password
183188
:param server: OverkizServer
189+
:param verify_ssl: whether to verify SSL certificates
190+
:param token: optional access token
184191
:param session: optional ClientSession
192+
:param action_queue_enabled: enable action batching queue (default False)
193+
:param action_queue_delay: seconds to wait before flushing queue (default 0.5)
194+
:param action_queue_max_actions: max actions per batch (default 20)
185195
"""
186196
self.username = username
187197
self.password = password
@@ -196,6 +206,14 @@ def __init__(
196206
self.session = session if session else ClientSession()
197207
self._ssl = verify_ssl
198208

209+
# Initialize action queue if enabled
210+
if action_queue_enabled:
211+
self._action_queue = ActionQueue(
212+
executor=self._execute_action_group_direct,
213+
delay=action_queue_delay,
214+
max_actions=action_queue_max_actions,
215+
)
216+
199217
if LOCAL_API_PATH in self.server.endpoint:
200218
self.api_type = APIType.LOCAL
201219

@@ -225,6 +243,10 @@ async def __aexit__(
225243

226244
async def close(self) -> None:
227245
"""Close the session."""
246+
# Flush any pending actions in queue
247+
if self._action_queue:
248+
await self._action_queue.shutdown()
249+
228250
if self.event_listener_id:
229251
await self.unregister_event_listener()
230252

@@ -631,13 +653,13 @@ async def get_api_version(self) -> str:
631653

632654
@retry_on_too_many_executions
633655
@retry_on_auth_error
634-
async def execute_action_group(
656+
async def _execute_action_group_direct(
635657
self,
636658
actions: list[Action],
637659
mode: CommandMode | None = None,
638660
label: str | None = "python-overkiz-api",
639661
) -> str:
640-
"""Execute a non-persistent action group.
662+
"""Execute a non-persistent action group directly (internal method).
641663
642664
The executed action group does not have to be persisted on the server before use.
643665
Per-session rate-limit : 1 calls per 28min 48s period for all operations of the same category (exec)
@@ -662,6 +684,48 @@ async def execute_action_group(
662684

663685
return cast(str, response["execId"])
664686

687+
async def execute_action_group(
688+
self,
689+
actions: list[Action],
690+
mode: CommandMode | None = None,
691+
label: str | None = "python-overkiz-api",
692+
) -> str | QueuedExecution:
693+
"""Execute a non-persistent action group.
694+
695+
If action queue is enabled, actions will be batched with other actions
696+
executed within the configured delay window. Returns a QueuedExecution
697+
that can be awaited to get the exec_id.
698+
699+
If action queue is disabled, executes immediately and returns exec_id directly.
700+
701+
:param actions: List of actions to execute
702+
:param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None)
703+
:param label: Label for the action group
704+
:return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled)
705+
"""
706+
if self._action_queue:
707+
return await self._action_queue.add(actions, mode, label)
708+
else:
709+
return await self._execute_action_group_direct(actions, mode, label)
710+
711+
async def flush_action_queue(self) -> None:
712+
"""Force flush all pending actions in the queue immediately.
713+
714+
If action queue is disabled, this method does nothing.
715+
If there are no pending actions, this method does nothing.
716+
"""
717+
if self._action_queue:
718+
await self._action_queue.flush()
719+
720+
def get_pending_actions_count(self) -> int:
721+
"""Get the number of actions currently waiting in the queue.
722+
723+
Returns 0 if action queue is disabled.
724+
"""
725+
if self._action_queue:
726+
return self._action_queue.get_pending_count()
727+
return 0
728+
665729
@retry_on_auth_error
666730
async def cancel_command(self, exec_id: str) -> None:
667731
"""Cancel a running setup-level execution."""

0 commit comments

Comments
 (0)