Skip to content

Commit 56f6ac7

Browse files
wip
1 parent 11860c1 commit 56f6ac7

File tree

6 files changed

+270
-12
lines changed

6 files changed

+270
-12
lines changed

fastloop/fastloop.py

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from .models import LoopEvent
3939
from .monitor import LoopMonitor
4040
from .scheduler import Schedule, validate_cron
41-
from .state.state import StateManager, create_state_manager
41+
from .state.state import LoopState, StateManager, create_state_manager
4242
from .task import TaskManager, TaskResult
4343
from .types import BaseConfig, ExecutorType, LoopStatus, RetryPolicy
4444
from .utils import get_func_import_path, import_func_from_path, infer_application_path
@@ -682,7 +682,7 @@ async def _cancel_handler(workflow_run_id: str):
682682
) from e
683683

684684
async def _resume_handler(
685-
workflow_run_id: str, request: dict[str, Any] = {}
685+
workflow_run_id: str, request: dict[str, Any] | None = None
686686
):
687687
try:
688688
workflow = await self.state_manager.get_workflow(workflow_run_id)
@@ -829,6 +829,166 @@ async def has_active_clients(self, loop_id: str) -> bool:
829829
client_count = await self.state_manager.get_active_client_count(loop_id)
830830
return client_count > 0
831831

832+
async def start_loop(
833+
self,
834+
name: str,
835+
loop_id: str,
836+
initial_data: dict[str, Any] | None = None,
837+
) -> LoopState:
838+
"""Start a named loop with a specific loop_id.
839+
840+
Args:
841+
name: The registered loop name (from @app.loop decorator)
842+
loop_id: The unique identifier for this loop instance
843+
initial_data: Optional initial context data for the loop
844+
845+
Returns:
846+
The LoopState for the started loop
847+
848+
Raises:
849+
LoopNotFoundError: If the loop name is not registered
850+
"""
851+
if name not in self._loop_metadata:
852+
raise LoopNotFoundError(f"Loop '{name}' is not registered")
853+
854+
metadata = self._loop_metadata[name]
855+
func = metadata["func"]
856+
857+
loop, _created = await self.state_manager.get_or_create_loop(
858+
loop_name=name,
859+
loop_id=loop_id,
860+
current_function_path=get_func_import_path(func),
861+
create_with_id=True,
862+
)
863+
864+
if loop.status == LoopStatus.STOPPED:
865+
logger.warning(
866+
"Loop is stopped, not starting",
867+
extra={"loop_id": loop_id, "loop_name": name},
868+
)
869+
return loop
870+
871+
context = LoopContext(
872+
loop_id=loop.loop_id,
873+
initial_event=None,
874+
state_manager=self.state_manager,
875+
integrations=metadata.get("integrations", []),
876+
)
877+
878+
if initial_data:
879+
for key, value in initial_data.items():
880+
await context.set(key, value)
881+
882+
await context.setup_integrations()
883+
884+
loop_instance: Loop | None = metadata.get("loop_instance")
885+
if loop_instance:
886+
loop_instance.ctx = context
887+
func = loop_instance.loop
888+
889+
started = await self.loop_manager.start(
890+
func=func,
891+
loop_start_func=metadata.get("on_start"),
892+
loop_stop_func=metadata.get("on_stop"),
893+
context=context,
894+
loop=loop,
895+
loop_delay=metadata["loop_delay"],
896+
stop_after_idle_seconds=metadata.get("stop_after_idle_seconds"),
897+
pause_after_idle_seconds=metadata.get("pause_after_idle_seconds"),
898+
)
899+
900+
if started:
901+
logger.info(
902+
"Loop started",
903+
extra={"loop_id": loop.loop_id, "loop_name": name},
904+
)
905+
906+
return await self.state_manager.get_loop(loop.loop_id)
907+
908+
async def stop_loop(self, name: str, loop_id: str) -> bool:
909+
"""Stop a specific loop instance.
910+
911+
Args:
912+
name: The registered loop name
913+
loop_id: The unique identifier for this loop instance
914+
915+
Returns:
916+
True if the loop was stopped, False if it wasn't running
917+
"""
918+
if name not in self._loop_metadata:
919+
raise LoopNotFoundError(f"Loop '{name}' is not registered")
920+
921+
try:
922+
loop = await self.state_manager.get_loop(loop_id)
923+
if loop.loop_name != name:
924+
logger.warning(
925+
"Loop name mismatch",
926+
extra={
927+
"expected": name,
928+
"actual": loop.loop_name,
929+
"loop_id": loop_id,
930+
},
931+
)
932+
return False
933+
934+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
935+
stopped = await self.loop_manager.stop(loop_id)
936+
937+
if stopped:
938+
logger.info(
939+
"Loop stopped",
940+
extra={"loop_id": loop_id, "loop_name": name},
941+
)
942+
943+
return stopped
944+
945+
except LoopNotFoundError:
946+
return False
947+
948+
async def loop_exists(self, name: str, loop_id: str) -> bool:
949+
"""Check if a loop instance exists and is active (running or idle).
950+
951+
Args:
952+
name: The registered loop name
953+
loop_id: The unique identifier for this loop instance
954+
955+
Returns:
956+
True if the loop exists and is running/idle, False otherwise
957+
"""
958+
if name not in self._loop_metadata:
959+
return False
960+
961+
try:
962+
loop = await self.state_manager.get_loop(loop_id)
963+
if loop.loop_name != name:
964+
return False
965+
return loop.status in (
966+
LoopStatus.RUNNING,
967+
LoopStatus.IDLE,
968+
LoopStatus.PENDING,
969+
)
970+
except LoopNotFoundError:
971+
return False
972+
973+
async def list_loops(self, name: str) -> list[str]:
974+
"""List all active loop IDs for a given loop name.
975+
976+
Args:
977+
name: The registered loop name
978+
979+
Returns:
980+
List of loop_ids that are currently active (running or idle)
981+
"""
982+
if name not in self._loop_metadata:
983+
raise LoopNotFoundError(f"Loop '{name}' is not registered")
984+
985+
loops = await self.state_manager.get_loops_by_name(name)
986+
return [
987+
loop.loop_id
988+
for loop in loops
989+
if loop.status in (LoopStatus.RUNNING, LoopStatus.IDLE, LoopStatus.PENDING)
990+
]
991+
832992
async def restart_workflow(self, workflow_run_id: str) -> bool:
833993
"""Restart a workflow from its persisted state."""
834994
try:

fastloop/state/state.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ async def get_or_create_loop(
4646
loop_name: str | None = None,
4747
loop_id: str | None = None,
4848
current_function_path: str = "",
49+
create_with_id: bool = False,
4950
) -> tuple[LoopState, bool]:
5051
pass
5152

@@ -57,6 +58,16 @@ async def update_loop(self, loop_id: str, state: LoopState):
5758
async def update_loop_status(self, loop_id: str, status: LoopStatus) -> LoopState:
5859
pass
5960

61+
@abstractmethod
62+
async def get_loops_by_name(
63+
self, loop_name: str, status: LoopStatus | None = None
64+
) -> list[LoopState]:
65+
pass
66+
67+
@abstractmethod
68+
async def add_loop_to_name_index(self, loop_name: str, loop_id: str) -> None:
69+
pass
70+
6071
@abstractmethod
6172
async def get_event_history(self, loop_id: str) -> list[dict[str, Any]]:
6273
pass

fastloop/state/state_redis.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
class RedisKeys:
4242
LOOP_INDEX = f"{KEY_PREFIX}:{{app_name}}:index"
43+
LOOP_NAME_INDEX = f"{KEY_PREFIX}:{{app_name}}:loops_by_name:{{loop_name}}"
4344
LOOP_EVENT_QUEUE_SERVER = f"{KEY_PREFIX}:{{app_name}}:events:{{loop_id}}:server"
4445
LOOP_EVENT_QUEUE_CLIENT = (
4546
f"{KEY_PREFIX}:{{app_name}}:events:{{loop_id}}:{{event_type}}:client"
@@ -339,20 +340,23 @@ async def get_or_create_loop(
339340
loop_name: str | None = None,
340341
loop_id: str | None = None,
341342
current_function_path: str = "",
343+
create_with_id: bool = False,
342344
) -> tuple[LoopState, bool]:
343345
if loop_id:
344346
loop_str = await self.rdb.get(
345347
RedisKeys.LOOP_STATE.format(app_name=self.app_name, loop_id=loop_id)
346348
)
347349
if loop_str:
348350
return LoopState.from_json(loop_str.decode("utf-8")), False
349-
else:
351+
elif not create_with_id:
350352
raise LoopNotFoundError(f"Loop {loop_id} not found")
351353

352354
if not current_function_path:
353355
raise ValueError("Current function is required")
354356

355-
loop_id = str(uuid.uuid4())
357+
if not loop_id:
358+
loop_id = str(uuid.uuid4())
359+
356360
loop = LoopState(
357361
loop_id=loop_id,
358362
loop_name=loop_name,
@@ -368,6 +372,9 @@ async def get_or_create_loop(
368372
RedisKeys.LOOP_INDEX.format(app_name=self.app_name), loop_id
369373
) # type: ignore
370374

375+
if loop_name:
376+
await self.add_loop_to_name_index(loop_name, loop_id)
377+
371378
return loop, True
372379

373380
async def update_loop(self, loop_id: str, state: LoopState):
@@ -554,6 +561,50 @@ async def get_all_loops(self, status: LoopStatus | None = None) -> list[LoopStat
554561

555562
return results
556563

564+
async def get_loops_by_name(
565+
self, loop_name: str, status: LoopStatus | None = None
566+
) -> list[LoopState]:
567+
name_index_key = RedisKeys.LOOP_NAME_INDEX.format(
568+
app_name=self.app_name, loop_name=loop_name
569+
)
570+
loop_ids = await self.rdb.smembers(name_index_key)
571+
if not loop_ids:
572+
return []
573+
574+
keys = [
575+
RedisKeys.LOOP_STATE.format(app_name=self.app_name, loop_id=lid.decode())
576+
for lid in loop_ids
577+
]
578+
values = await self.rdb.mget(keys)
579+
580+
results: list[LoopState] = []
581+
stale_ids: list[str] = []
582+
583+
for loop_id_bytes, val in zip(loop_ids, values, strict=True):
584+
loop_id = loop_id_bytes.decode()
585+
if not val:
586+
stale_ids.append(loop_id)
587+
continue
588+
try:
589+
loop_state = LoopState.from_json(val.decode("utf-8"))
590+
except (TypeError, json.JSONDecodeError):
591+
stale_ids.append(loop_id)
592+
continue
593+
if status and loop_state.status != status:
594+
continue
595+
results.append(loop_state)
596+
597+
if stale_ids:
598+
await self.rdb.srem(name_index_key, *stale_ids)
599+
600+
return results
601+
602+
async def add_loop_to_name_index(self, loop_name: str, loop_id: str) -> None:
603+
name_index_key = RedisKeys.LOOP_NAME_INDEX.format(
604+
app_name=self.app_name, loop_name=loop_name
605+
)
606+
await self.rdb.sadd(name_index_key, loop_id)
607+
557608
async def get_event_history(self, loop_id: str) -> list[dict[str, Any]]:
558609
event_history: list[bytes] | None = await self.rdb.lrange( # type: ignore
559610
RedisKeys.LOOP_EVENT_HISTORY.format(

fastloop/state/state_s3.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class S3Keys:
2121
def loop_index(prefix: str, app_name: str) -> str:
2222
return f"{prefix}/{app_name}/index.json"
2323

24+
@staticmethod
25+
def loop_name_index(prefix: str, app_name: str, loop_name: str) -> str:
26+
return f"{prefix}/{app_name}/loops_by_name/{loop_name}.json"
27+
2428
@staticmethod
2529
def loop_state(prefix: str, app_name: str, loop_id: str) -> str:
2630
return f"{prefix}/{app_name}/state/{loop_id}.json"
@@ -264,12 +268,18 @@ async def get_or_create_loop(
264268
loop_name: str | None = None,
265269
loop_id: str | None = None,
266270
current_function_path: str = "",
271+
create_with_id: bool = False,
267272
) -> tuple[LoopState, bool]:
268273
if loop_id:
269-
loop = await self.get_loop(loop_id)
270-
return loop, False
274+
try:
275+
loop = await self.get_loop(loop_id)
276+
return loop, False
277+
except LoopNotFoundError:
278+
if not create_with_id:
279+
raise
271280

272-
loop_id = str(uuid.uuid4())
281+
if not loop_id:
282+
loop_id = str(uuid.uuid4())
273283
loop = LoopState(
274284
loop_id=loop_id,
275285
loop_name=loop_name,
@@ -285,6 +295,9 @@ async def get_or_create_loop(
285295
index.append(loop_id)
286296
self._put_json(S3Keys.loop_index(self.prefix, self.app_name), index)
287297

298+
if loop_name:
299+
await self.add_loop_to_name_index(loop_name, loop_id)
300+
288301
return loop, True
289302

290303
async def update_loop(self, loop_id: str, state: LoopState):
@@ -316,6 +329,31 @@ async def get_all_loops(self, status: LoopStatus | None = None) -> list[LoopStat
316329

317330
return loops
318331

332+
async def get_loops_by_name(
333+
self, loop_name: str, status: LoopStatus | None = None
334+
) -> list[LoopState]:
335+
name_index_key = S3Keys.loop_name_index(self.prefix, self.app_name, loop_name)
336+
loop_ids: list[str] = self._get_json(name_index_key) or []
337+
loops: list[LoopState] = []
338+
339+
for loop_id in loop_ids:
340+
try:
341+
loop = await self.get_loop(loop_id)
342+
if status and loop.status != status:
343+
continue
344+
loops.append(loop)
345+
except LoopNotFoundError:
346+
continue
347+
348+
return loops
349+
350+
async def add_loop_to_name_index(self, loop_name: str, loop_id: str) -> None:
351+
name_index_key = S3Keys.loop_name_index(self.prefix, self.app_name, loop_name)
352+
loop_ids: list[str] = self._get_json(name_index_key) or []
353+
if loop_id not in loop_ids:
354+
loop_ids.append(loop_id)
355+
self._put_json(name_index_key, loop_ids)
356+
319357
async def get_event_history(self, loop_id: str) -> list[dict[str, Any]]:
320358
return (
321359
self._get_json(
@@ -749,10 +787,8 @@ async def set_workflow_resume_payload(
749787
f"{self.prefix}/{self.app_name}/workflow_resume_payload/{workflow_id}.json"
750788
)
751789
if payload is None:
752-
try:
790+
with suppress(ClientError):
753791
self.s3.delete_object(Bucket=self.bucket, Key=key)
754-
except ClientError:
755-
pass
756792
else:
757793
self.s3.put_object(
758794
Bucket=self.bucket, Key=key, Body=json.dumps(payload).encode("utf-8")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.116"
3+
version = "0.1.117"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

0 commit comments

Comments
 (0)