Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,28 @@ async def drop(self) -> None:

@override
async def purge(self) -> None:
await self._client.purge()
try:
await self._client.purge()
except NotImplementedError:
client_name = type(self._client).__name__
if self._name is not None:
logger.warning(
f'Storage client "{client_name}" does not support purging the request queue. '
f'Skipping purge for named queue "{self._name}" to avoid destroying persistent data; '
'the queue contents are left intact.'
)
return
logger.warning(
f'Storage client "{client_name}" does not support purging the request queue. '
'Falling back to dropping and recreating the unnamed queue; the request queue ID may change.'
)
await self.drop()
# Override `purge_on_start` so the storage client does not try to purge the freshly recreated
# (and necessarily empty) queue and re-raise the same `NotImplementedError`.
recreate_config = service_locator.get_configuration().model_copy(update={'purge_on_start': False})
new_rq = await RequestQueue.open(configuration=recreate_config)
self._client = new_rq._client # noqa: SLF001
self._id = new_rq._id # noqa: SLF001

@override
async def add_request(
Expand Down
88 changes: 88 additions & 0 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,94 @@ async def test_purge(
await rq.drop()


async def test_purge_falls_back_to_drop_for_unnamed_queue_when_not_implemented(
storage_client: StorageClient,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that `purge` falls back to drop+recreate for unnamed queues when the client raises `NotImplementedError`.

Some storage clients (e.g. the Apify platform client) do not support purging. For the default unnamed queue
used by `BasicCrawler`, `purge` should drop and recreate the queue so that callers keep working on repeated
runs. Named queues are handled separately to avoid destroying persistent data.
"""
rq = await RequestQueue.open(storage_client=storage_client)
assert rq.name is None

await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2

async def _raise_not_implemented(self: object) -> None:
raise NotImplementedError('Purge is not supported.')

monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)

with caplog.at_level('WARNING'):
await rq.purge()

assert any(
'does not support purging' in rec.message and 'dropping and recreating' in rec.message for rec in caplog.records
)

# The queue should be empty, usable, and backed by a fresh client (id may differ for backends that mint new ids).
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 0
assert metadata.total_request_count == 0
assert metadata.handled_request_count == 0
assert rq.id is not None

await rq.add_request('https://example.com/after-purge')
request = await rq.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/after-purge'

await rq.drop()


async def test_purge_skips_named_queue_when_not_implemented(
storage_client: StorageClient,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that `purge` is a logged no-op for named queues when the client raises `NotImplementedError`.

Named queues are considered persistent (e.g. shared across runs on the Apify platform), so falling back
to drop+recreate would silently destroy user data. Instead `purge` logs a warning and leaves the queue
intact.
"""
rq = await RequestQueue.open(
name='purge-fallback-named-test',
storage_client=storage_client,
)
original_id = rq.id

await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2

async def _raise_not_implemented(self: object) -> None:
raise NotImplementedError('Purge is not supported.')

monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)

with caplog.at_level('WARNING'):
await rq.purge()

assert any(
'does not support purging' in rec.message and 'Skipping purge for named queue' in rec.message
for rec in caplog.records
)

# Queue identity and contents must be preserved.
assert rq.id == original_id
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2
assert metadata.total_request_count == 2

await rq.drop()


async def test_open_with_alias(
storage_client: StorageClient,
) -> None:
Expand Down
Loading