Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

from typing_extensions import Self

from crawlee._types import JsonSerializable
from crawlee.proxy_configuration import _NewUrlFunction

from apify._models import Webhook
Expand Down Expand Up @@ -95,6 +96,8 @@ async def main() -> None:
```
"""

_ACTOR_STATE_KEY = 'APIFY_GLOBAL_STATE'

def __init__(
self,
configuration: Configuration | None = None,
Expand Down Expand Up @@ -133,6 +136,9 @@ def __init__(

self._apify_client: ApifyClientAsync | None = None

# Keep track of all used state stores to persist their values on exit
self._use_state_stores: set[str | None] = set()

self._is_initialized = False
"""Whether any Actor instance is currently initialized."""

Expand Down Expand Up @@ -243,6 +249,9 @@ async def finalize() -> None:
await self.event_manager.__aexit__(None, None, None)
await self._charging_manager_implementation.__aexit__(None, None, None)

# Persist Actor state
await self._save_actor_state()

await asyncio.wait_for(finalize(), self._cleanup_timeout.total_seconds())
self._is_initialized = False

Expand Down Expand Up @@ -1324,6 +1333,31 @@ async def create_proxy_configuration(

return proxy_configuration

async def use_state(
self,
default_value: dict[str, JsonSerializable] | None = None,
key: str | None = None,
kvs_name: str | None = None,
) -> dict[str, JsonSerializable]:
"""Easily create and manage state values. All state values are automatically persisted.

Values can be modified by simply using the assignment operator.

Args:
default_value: The default value to initialize the state if it is not already set.
key: The key in the key-value store where the state is stored. If not provided, a default key is used.
kvs_name: The name of the key-value store where the state is stored. If not provided, the default
key-value store associated with the Actor run is used.
"""
self._use_state_stores.add(kvs_name)
kvs = await self.open_key_value_store(name=kvs_name)
return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value)

async def _save_actor_state(self) -> None:
for kvs_name in self._use_state_stores:
store = await self.open_key_value_store(name=kvs_name)
await store.persist_autosaved_values()

def _raise_if_not_initialized(self) -> None:
if not self._is_initialized:
raise RuntimeError('The Actor was not initialized!')
Expand Down
78 changes: 78 additions & 0 deletions tests/unit/actor/test_actor_key_value_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import asyncio

import pytest

from apify_shared.consts import ApifyEnvVars
Expand Down Expand Up @@ -106,3 +108,79 @@ async def test_get_input_with_encrypted_secrets(monkeypatch: pytest.MonkeyPatch)
assert actor_input['secret_string'] == secret_string
assert actor_input['secret_object'] == secret_object
assert actor_input['secret_array'] == secret_array


async def test_use_state(monkeypatch: pytest.MonkeyPatch) -> None:
# Set a short persist state interval to speed up the test
monkeypatch.setenv(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, '100')
async with Actor as actor:
state = await actor.use_state()
assert state == {}

state['state'] = 'first_state'

await asyncio.sleep(0.2) # Wait for the state to be persisted

kvs = await actor.open_key_value_store()
stored_state = await kvs.get_value('CRAWLEE_STATE_0')
assert stored_state == {'state': 'first_state'}

state['state'] = 'finished_state'

saved_sate = await kvs.get_value('CRAWLEE_STATE_0')
assert saved_sate == {'state': 'finished_state'}


async def test_use_state_non_default(monkeypatch: pytest.MonkeyPatch) -> None:
# Set a short persist state interval to speed up the test
monkeypatch.setenv(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, '100')
async with Actor as actor:
state = await actor.use_state(
default_value={'state': 'initial_state'}, key='custom_state_key', kvs_name='custom-kvs'
)
assert state == {'state': 'initial_state'}

state['state'] = 'first_state'

await asyncio.sleep(0.2) # Wait for the state to be persisted

kvs = await actor.open_key_value_store(name='custom-kvs')
stored_state = await kvs.get_value('custom_state_key')
assert stored_state == {'state': 'first_state'}

state['state'] = 'finished_state'

saved_sate = await kvs.get_value('custom_state_key')
assert saved_sate == {'state': 'finished_state'}


async def test_use_state_persists_on_actor_stop() -> None:
async with Actor as actor:
state = await actor.use_state()
assert state == {}

kvs = await actor.open_key_value_store()

state['state'] = 'finished_state'

# After Actor context is exited, the state should be persisted
saved_sate = await kvs.get_value('CRAWLEE_STATE_0')
assert saved_sate == {'state': 'finished_state'}


async def test_use_state_with_multiple_stores() -> None:
async with Actor as actor:
state_default = await actor.use_state()
state_custom = await actor.use_state(kvs_name='custom-kvs')

state_default['value'] = 'default_store'
state_custom['value'] = 'custom_store'

kvs_default = await actor.open_key_value_store()
kvs_custom = await actor.open_key_value_store(name='custom-kvs')

saved_state_default = await kvs_default.get_value('CRAWLEE_STATE_0')
assert saved_state_default == {'value': 'default_store'}

saved_state_custom = await kvs_custom.get_value('CRAWLEE_STATE_0')
assert saved_state_custom == {'value': 'custom_store'}
Loading