Add sync API, refactor core logic, and enhance CI security#11
Conversation
Introduces a synchronous job queue API alongside the existing async API, enabling seamless interoperability between both interfaces. Refactors core input validation, Lua script loading, and result formatting into a shared core module to ensure consistent behavior and error handling. Updates documentation and adds comprehensive sync API tests to ensure feature parity and cross-API compatibility.
Introduces configuration files for a code quality toolkit, enabling code health checks and exclusions. Simplifies exception handling by removing unused variables and cleans up redundant test await usage to improve code clarity.
Enhances CI security and reproducibility by replacing version tags with explicit commit SHAs for all third-party actions in workflow files. Also disables credential persistence for checkouts to reduce potential risk exposure.
Modularizes and clarifies configuration validation logic, introducing helper functions and improving error messaging for invalid arguments. Enhances test reliability by using platform-agnostic temp directories for Unix socket paths and suppressing teardown exceptions. Annotates unittest lifecycle hooks to quiet static analysis warnings.
Splits monolithic core into focused modules for config, Redis operations, Lua script loading, response formatting, and argument validation. Improves code maintainability, testability, and clarity by decoupling responsibilities. Updates async and sync queue classes to use the refactored internals and unifies key construction across code paths. Adjusts tests for new object structure and patch targets. Enables easier extension and more reliable Redis and Lua handling by isolating connection logic and script registration. Moves config validation and argument checking to dedicated modules, reducing duplication. No functional behavior is changed, but future enhancements and bug fixes are now simpler and safer.
Extracts shared queue logic into a new base class to reduce code duplication between async and sync clients. Improves maintainability by consolidating argument validation, Redis command construction, and response formatting in one place. Updates usage to subclass the new base and simplifies methods accordingly.
📝 WalkthroughWalkthroughThis PR adds a synchronous FQ API, refactors the queue to use Redis Lua scripts, introduces structured config/validation/key utilities, adds Redis client factories, centralizes shared non-I/O logic, updates docs/README, hardens CI by pinning Actions, and expands/updates tests for the new architecture. ChangesCI / Qlty Configuration
Core Queue Engine, Sync API & Tests
Sequence Diagram(s)sequenceDiagram
participant Client
participant AsyncFQ as Async FQ
participant BaseLogic as Base Logic
participant Lua as Lua Script
participant Redis
Client->>AsyncFQ: enqueue(payload, interval)
AsyncFQ->>BaseLogic: _build_enqueue_call()
BaseLogic->>BaseLogic: validate args, serialize payload
BaseLogic-->>AsyncFQ: (keys, args)
AsyncFQ->>Lua: _scripts.enqueue.evalsha(keys, args)
Lua->>Redis: execute atomic enqueue operations
Redis-->>Lua: result
Lua-->>AsyncFQ: result
AsyncFQ->>BaseLogic: format response
BaseLogic-->>AsyncFQ: {queued: true}
AsyncFQ-->>Client: {queued: true}
sequenceDiagram
participant Client
participant SyncFQ as Sync FQ
participant BaseLogic as Base Logic
participant Lua as Lua Script
participant Redis
Client->>SyncFQ: dequeue(queue_type)
SyncFQ->>BaseLogic: _build_dequeue_call()
BaseLogic->>BaseLogic: validate queue_type
BaseLogic-->>SyncFQ: (keys, args)
SyncFQ->>Lua: _scripts.dequeue.evalsha(keys, args)
Lua->>Redis: atomic pop and payload fetch
Redis-->>Lua: (job_id, payload, queue_id)
Lua-->>SyncFQ: result
SyncFQ->>BaseLogic: _dequeue_response()
BaseLogic->>BaseLogic: decode IDs, deserialize payload
BaseLogic-->>SyncFQ: {job_id, queue_id, payload}
SyncFQ-->>Client: {job_id, queue_id, payload}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR modularizes the Flowdacity Queue implementation to support both async and sync client APIs, while tightening CI/workflow security and updating documentation to reflect the expanded interface.
Changes:
- Refactors the async
FQimplementation to share core logic via a newBaseFQ(config parsing, key building, validation, response formatting). - Adds a new synchronous
fq.sync.FQclient and corresponding test coverage. - Pins GitHub Actions by SHA and adds Qlty configuration for consistent code quality checks.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_sync_queue.py | Adds end-to-end coverage for the new synchronous API (enqueue/dequeue/finish, metrics, requeue, interop). |
| tests/test_queue.py | Adds Qlty ignore annotations for unittest lifecycle hooks. |
| tests/test_func.py | Updates tests for refactored internals (Lua scripts container, redis client patch points, socket path constant). |
| tests/test_edge_cases.py | Updates edge-case tests to align with new redis client module + _scripts container usage and improves teardown robustness. |
| tests/config.py | Centralizes test unix socket path via a constant. |
| src/fq/validators.py | Introduces centralized argument validation helpers shared by async/sync clients. |
| src/fq/utils.py | Minor cleanup in convert_to_str exception handling. |
| src/fq/sync/queue.py | Implements the new synchronous FQ client on top of BaseFQ. |
| src/fq/sync/init.py | Exposes the sync FQ interface from fq.sync. |
| src/fq/responses.py | Adds shared response decoding/formatting helpers (dequeue + metrics). |
| src/fq/redis.py | Adds factory + connection validation helpers for async/sync Redis clients (tcp/unix/cluster). |
| src/fq/queue.py | Refactors async FQ to use BaseFQ, LuaScripts, and the new redis client helpers. |
| src/fq/lua.py | Centralizes Lua script loading/registration into a single registry helper. |
| src/fq/keys.py | Introduces a single source of truth for Redis key naming. |
| src/fq/config.py | Adds structured config validation and normalization via dataclasses. |
| src/fq/base.py | Adds shared core logic (key building, validation calls, response shaping) for async/sync clients. |
| README.md | Updates docs with separate async vs sync usage and clarifies shared operations. |
| .qlty/qlty.toml | Adds Qlty configuration (linters/scanners, excludes, test patterns). |
| .qlty/.gitignore | Limits committed Qlty directory contents to intended config files. |
| .github/workflows/tests.yml | Pins Actions by SHA and disables credential persistence on checkout. |
| .github/workflows/pypi.yml | Pins Actions by SHA and disables credential persistence on checkout. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
src/fq/lua.py (2)
11-18: 💤 Low valuePrefer a more specific type annotation than
objectfor script fields.Using
objectfor all six fields loses type-checker benefits. The registered scripts returned byredis_client.register_script()areredis.client.Scriptinstances for sync clients andredis.asyncio.client.Scriptfor async ones. Annotating asAny(or a union if you want precision) at least signals intent and avoids accidental field overwrite checks being silently ignored by type-checkers.-from dataclasses import dataclass +from dataclasses import dataclass +from typing import Any `@dataclass`(frozen=True) class LuaScripts: - enqueue: object - dequeue: object - finish: object - interval: object - requeue: object - metrics: object + enqueue: Any + dequeue: Any + finish: Any + interval: Any + requeue: Any + metrics: Any🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/lua.py` around lines 11 - 18, The LuaScripts dataclass uses overly broad `object` annotations for its six script fields; change those to a more specific type like `Any` or a union of sync/async script types (e.g., `redis.client.Script | redis.asyncio.client.Script`) to preserve type-checker safety. Update the top of the module to import the chosen typing helper (`Any` or `Union`) and, if using explicit script classes, import `Script` from `redis.client` and `redis.asyncio.client` (or alias one) and then update the annotations on the LuaScripts fields (`enqueue`, `dequeue`, `finish`, `interval`, `requeue`, `metrics`) accordingly. Ensure the dataclass remains frozen and no runtime behavior changes.
8-26: ⚡ Quick win
LUA_SCRIPT_NAMESandLuaScriptsfields must be kept manually in sync.
cls(**registered_scripts)works today because the tuple and the dataclass fields are identical. A mismatch (e.g., adding a script name without the matching field, or vice versa) will only surface at runtime as aTypeError. Consider deriving one from the other to make it self-validating:from dataclasses import fields as dc_fields # Derive the names from the dataclass itself, eliminating the parallel list LUA_SCRIPT_NAMES = tuple(f.name for f in dc_fields(LuaScripts))Alternatively, at minimum add a
__post_init__-equivalent check or a unit test that assertsset(LUA_SCRIPT_NAMES) == {f.name for f in dc_fields(LuaScripts)}.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/lua.py` around lines 8 - 26, LUA_SCRIPT_NAMES and the LuaScripts dataclass are maintained separately, which can cause a runtime TypeError if they diverge; fix by deriving the script names from the dataclass fields instead of a hardcoded tuple or by adding a validation check in LuaScripts.register: use dataclasses.fields(LuaScripts) to compute the list/tuple of field names and iterate those when building registered_scripts (or assert set(LUA_SCRIPT_NAMES) == {f.name for f in dataclasses.fields(LuaScripts)} at the start of register) so the names and dataclass stay in sync.src/fq/utils.py (1)
73-81: ⚡ Quick winTighten the broad
except Exceptioncatch.The blanket
except Exceptionsilently swallows any unexpected error (e.g.,TypeError,MemoryErrorsubclasses, programmer mistakes). The only realistic failure path here is aUnicodeDecodeErrororAttributeErrorfrom items that are notbytes. Narrowing the clause makes failures visible.♻️ Proposed fix
- except Exception: + except (AttributeError, UnicodeDecodeError):🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/utils.py` around lines 73 - 81, The convert_to_str function currently uses a broad except Exception which hides real errors; change the handling to explicitly handle the expected cases: check if the item is bytes/bytearray or attempt decode and only catch UnicodeDecodeError (and optionally AttributeError) instead of Exception, or use an isinstance(queue, (bytes, bytearray)) branch to call queue.decode("utf-8") and append the original item for non-bytes—update convert_to_str/queue_list logic to narrow the exception handling to UnicodeDecodeError/AttributeError only.tests/test_sync_queue.py (1)
290-368: ⚖️ Poor tradeoffSync Redis I/O inside
asyncio.run()blocks the event loop.
sync_queue.initialize(),.enqueue(),.dequeue(), and.finish()all make blocking network calls (via synchronous redis-py) while executing inside thescenario()coroutine underasyncio.run(). This technically stalls the event loop on each call.In practice it works here because the event loop has no concurrent tasks and local Redis is fast. For a more robust test, consider restructuring to avoid running sync operations inside an async coroutine, or use
asyncio.get_event_loop().run_in_executor(None, sync_op)for each sync call.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_sync_queue.py` around lines 290 - 368, The test runs blocking synchronous Redis calls inside the coroutine scenario (sync_queue.initialize, sync_queue.enqueue, sync_queue.dequeue, sync_queue.finish) which stalls the event loop; fix it by offloading each blocking sync call to a thread executor (e.g., asyncio.get_event_loop().run_in_executor or asyncio.to_thread) or by moving the sync-only sequence out of the async coroutine and running it synchronously (wrap the sync sequence in a helper function and call it with run_in_executor/from_thread). Ensure you update each reference to sync_queue.initialize, sync_queue.enqueue, sync_queue.dequeue, and sync_queue.finish in the test to be executed via the executor so no blocking sync I/O runs directly inside asyncio.run().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/fq/config.py`:
- Around line 132-137: The function _validate_connection_config currently
returns early for unix_sock before validating the redis["clustered"] boolean, so
clustered can be ignored for unix sockets; move or add validation of the
clustered flag at the top of _validate_connection_config (before branching on
conn_type) or call a helper like _validate_clustered_config(redis_config) first,
ensuring the boolean check runs for both unix_sock and tcp paths; update
references in _validate_unix_socket_config/_validate_tcp_socket_config if they
assume clustered was validated earlier.
- Around line 153-155: The config parsing currently only checks that redis.port
is an integer via _is_int_not_bool after retrieving it with
_require_config_value, but allows out-of-range values (0, negative, >65535) that
later break create_*_redis_client(); update the validation in
FQConfig.from_mapping() (around the code handling port) to assert the port is
within the valid TCP/UDP port range (1–65535) and raise FQException("Invalid
config: redis.port must be an integer between 1 and 65535") when it is not, so
invalid ports are rejected early.
In `@src/fq/redis.py`:
- Around line 14-17: The Redis client constructors (e.g., AsyncRedis
instantiation and other Redis/RedisCluster branches) are not forwarding
redis_config.password, so authenticated Unix-socket and cluster setups fail;
update every Redis client creation that currently passes
db/unix_socket_path/host/port/ssl to also pass password=redis_config.password
(for example in the AsyncRedis(...) call and the RedisCluster/Redis(...)
branches) so the upstream-validated password is used for all connection types.
- Around line 21-31: The startup_nodes list currently contains plain dicts which
causes AsyncRedisCluster to fail at runtime; update the code that constructs
startup_nodes (used when creating AsyncRedisCluster) to instantiate ClusterNode
objects from redis.asyncio.cluster using redis_config.host and
int(redis_config.port) instead of dicts (or alternatively pass host/port
directly to AsyncRedisCluster), so AsyncRedisCluster receives objects with .host
and .port attributes.
---
Nitpick comments:
In `@src/fq/lua.py`:
- Around line 11-18: The LuaScripts dataclass uses overly broad `object`
annotations for its six script fields; change those to a more specific type like
`Any` or a union of sync/async script types (e.g., `redis.client.Script |
redis.asyncio.client.Script`) to preserve type-checker safety. Update the top of
the module to import the chosen typing helper (`Any` or `Union`) and, if using
explicit script classes, import `Script` from `redis.client` and
`redis.asyncio.client` (or alias one) and then update the annotations on the
LuaScripts fields (`enqueue`, `dequeue`, `finish`, `interval`, `requeue`,
`metrics`) accordingly. Ensure the dataclass remains frozen and no runtime
behavior changes.
- Around line 8-26: LUA_SCRIPT_NAMES and the LuaScripts dataclass are maintained
separately, which can cause a runtime TypeError if they diverge; fix by deriving
the script names from the dataclass fields instead of a hardcoded tuple or by
adding a validation check in LuaScripts.register: use
dataclasses.fields(LuaScripts) to compute the list/tuple of field names and
iterate those when building registered_scripts (or assert set(LUA_SCRIPT_NAMES)
== {f.name for f in dataclasses.fields(LuaScripts)} at the start of register) so
the names and dataclass stay in sync.
In `@src/fq/utils.py`:
- Around line 73-81: The convert_to_str function currently uses a broad except
Exception which hides real errors; change the handling to explicitly handle the
expected cases: check if the item is bytes/bytearray or attempt decode and only
catch UnicodeDecodeError (and optionally AttributeError) instead of Exception,
or use an isinstance(queue, (bytes, bytearray)) branch to call
queue.decode("utf-8") and append the original item for non-bytes—update
convert_to_str/queue_list logic to narrow the exception handling to
UnicodeDecodeError/AttributeError only.
In `@tests/test_sync_queue.py`:
- Around line 290-368: The test runs blocking synchronous Redis calls inside the
coroutine scenario (sync_queue.initialize, sync_queue.enqueue,
sync_queue.dequeue, sync_queue.finish) which stalls the event loop; fix it by
offloading each blocking sync call to a thread executor (e.g.,
asyncio.get_event_loop().run_in_executor or asyncio.to_thread) or by moving the
sync-only sequence out of the async coroutine and running it synchronously (wrap
the sync sequence in a helper function and call it with
run_in_executor/from_thread). Ensure you update each reference to
sync_queue.initialize, sync_queue.enqueue, sync_queue.dequeue, and
sync_queue.finish in the test to be executed via the executor so no blocking
sync I/O runs directly inside asyncio.run().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9530f3cd-ca75-4dea-a706-1bc03eaa182c
📒 Files selected for processing (21)
.github/workflows/pypi.yml.github/workflows/tests.yml.qlty/.gitignore.qlty/qlty.tomlREADME.mdsrc/fq/base.pysrc/fq/config.pysrc/fq/keys.pysrc/fq/lua.pysrc/fq/queue.pysrc/fq/redis.pysrc/fq/responses.pysrc/fq/sync/__init__.pysrc/fq/sync/queue.pysrc/fq/utils.pysrc/fq/validators.pytests/config.pytests/test_edge_cases.pytests/test_func.pytests/test_queue.pytests/test_sync_queue.py
Passes the configured password to all Redis client initializations, including cluster and UNIX socket connections, enhancing security and compatibility with protected instances. Refines queue ID formatting logic to ensure correct deduplication of ready and active queues. Extends unit tests to cover password handling and queue ID formatting.
Validates that the 'clustered' Redis config flag is a boolean and that the port falls within the valid range, raising clear exceptions for misconfiguration. Refactors Lua script registration for maintainability. Updates async and sync interop in tests to use threads, improving compatibility and test reliability. Enhances error coverage for edge cases.
Splits and reorganizes configuration validation logic to clearly separate Redis and application (FQ) config concerns. Moves validation methods into their respective classes for better encapsulation and maintainability. Simplifies and clarifies value checks, improving code readability and making validation flows more explicit.
Standardizes configuration by moving queue-related options from a generic "fq" section to a dedicated "queue" section with explicit validation and structure. Removes the key prefix from the Redis config, centralizing it under queue settings to reduce redundancy and improve separation of concerns. Updates documentation, code, and tests to reflect the new config schema, and adds stricter validation to reject legacy "fq" sections. Aims to improve configuration clarity and future extensibility.
Adds exception chaining when payload serialization fails to improve debugging. Updates documentation to clarify usage of Redis Unix sockets and example config. Enhances test to assert exception cause, ensuring better error traceability.
| if "redis" not in config or "queue" not in config: | ||
| raise FQException("Config missing required sections: redis, queue") |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/fq/config.py (1)
52-65:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
clustered=Truecombined withconn_type="unix_sock"is silently ignored.
_validate_clusteredonly checks the type. Whenconn_type="unix_sock"andclustered=True, validation passes, thencreate_async_redis_client/create_sync_redis_clientinsrc/fq/redis.pytake the non-cluster Unix-socket branch and theclusteredflag is dropped on the floor. That's confusing for the user — they think clustering is enabled. Either reject the combination here or document that clustering is TCP-only.Proposed fix
`@classmethod` def _validate_clustered(cls, config): if "clustered" in config and not isinstance(config["clustered"], bool): raise FQException("Invalid config: redis.clustered must be a boolean") + + if config.get("clustered") and config.get("conn_type") == "unix_sock": + raise FQException( + "Invalid config: redis.clustered is not supported with conn_type='unix_sock'" + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/config.py` around lines 52 - 65, The code currently lets clustered=True with conn_type="unix_sock" silently pass, dropping clustering later; update validation to reject that combination: in _validate_connection (or _validate_clustered) check if config.get("clustered") is True and config.get("conn_type") == "unix_sock" and raise an FQException with a clear message like "Invalid config: redis.clustered cannot be true with conn_type='unix_sock' (clustering is TCP-only)"; reference the existing methods _validate_connection and _validate_clustered and the create_async_redis_client/create_sync_redis_client behavior to ensure the validation prevents the unsupported combination.
🧹 Nitpick comments (9)
src/fq/lua.py (1)
18-26: 💤 Low valueMinor: scripts are re-read from disk on every
register()call.
fields(cls)iterates six entries and each invocation re-opens and decodes the corresponding.luafile. IfLuaScripts.register()is ever called per-queue or per-reload (you mentionedreload_lua_scriptsexists inqueue.py), consider caching the script source viafunctools.lru_cacheon_read_scriptto avoid repeated I/O. Not blocking for typical one-time initialization.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/lua.py` around lines 18 - 26, The register method currently calls cls._read_script for each field every time, causing repeated file I/O; decorate the _read_script method with functools.lru_cache (or otherwise cache its results) so subsequent calls return the cached script source instead of re-reading the .lua files; keep using fields(cls) and redis_client.register_script as-is, just ensure _read_script is memoized (e.g., apply `@functools.lru_cache` to LuaScripts._read_script) so LuaScripts.register avoids repeated disk reads.src/fq/responses.py (2)
31-41: 💤 Low valueImplicit assumption:
enqueue_detailsanddequeue_detailsare aligned and equal-length.The single loop indexes both lists with the same
iandi + 1, so any mismatch in length or key order between the two Redis responses produces silent drift (orIndexError) rather than a clear error. This is fine if the Luametricsscript guarantees identical key ordering, but worth a brief comment or assertion documenting the invariant — otherwise a future change in the Lua script becomes hard to trace.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/responses.py` around lines 31 - 41, format_metrics_counts assumes enqueue_details and dequeue_details are aligned and same-length; validate this invariant up front and fail loudly or iterate them separately: add a check in format_metrics_counts that len(enqueue_details) == len(dequeue_details) (and that both lengths are even), and raise a ValueError with a clear message identifying enqueue_details/dequeue_details if the check fails, or alternatively refactor to loop over each list independently (two loops over enqueue_details and dequeue_details) so misaligned Redis responses cannot silently drift; reference the function name format_metrics_counts and the variables enqueue_details/dequeue_details when making the change.
13-28: 💤 Low valueStrict tuple-unpack will raise on
len(dequeue_response) > 4.The guard at Line 14 only rejects responses shorter than 4 elements; if the Lua script ever returns a 5th field (e.g., for a future extension), the unpack on Line 17 raises
ValueError: too many values to unpackinstead of the clean{"status": "failure"}fallback. Consider either pinning the guard to!= 4(fail loud on contract drift) or indexing the first four values to be forward-compatible. Pick the contract you want; just align the check with the unpack.Proposed fix (forward-compatible)
- if len(dequeue_response) < 4: - return {"status": "failure"} - - queue_id, job_id, payload, requeues_remaining = dequeue_response + if len(dequeue_response) < 4: + return {"status": "failure"} + + queue_id, job_id, payload, requeues_remaining = dequeue_response[:4]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/responses.py` around lines 13 - 28, The function format_dequeue_response currently unpacks dequeue_response into four variables and will raise ValueError if the iterable has more than four elements; change the guard and unpack to be forward-compatible by checking len(dequeue_response) >= 4 and then assigning queue_id, job_id, payload, requeues_remaining = dequeue_response[0], dequeue_response[1], dequeue_response[2], dequeue_response[3] (keep the existing payload None check, decode_redis_value and deserialize_payload calls, and int(requeues_remaining) conversion) so extra trailing fields won’t cause a crash.src/fq/validators.py (1)
79-92: 💤 Low valueConfirm intentional asymmetry between
validate_metrics_argumentsandvalidate_clear_queue_arguments.
validate_metrics_argumentspermitsNonefor bothqueue_typeandqueue_id(scoping is optional, per the README), whilevalidate_clear_queue_argumentsrejectsNonefor either. This matches the README's documentation thatclear_queuerequires both, but it's worth a brief docstring on each function so future maintainers don't "normalize" the behavior. Not blocking.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/validators.py` around lines 79 - 92, Add brief docstrings to validate_metrics_arguments and validate_clear_queue_arguments that explicitly state the intended asymmetry: validate_metrics_arguments allows queue_type and queue_id to be None (optional scoping per README) while validate_clear_queue_arguments requires both non-None and valid identifiers; reference the README behavior in the docstrings so future maintainers understand why the validations differ and should not be normalized.src/fq/redis.py (2)
71-102: 💤 Low value
validate_*_redis_connectionsilently passes whenpingis missing.If
redis_clientlacks a callableping(Lines 76-77, 93-94), validation returns successfully. For tests/mocks this is convenient, but in production this branch should never be reachable — a realredis.Redis/RedisClusteralways exposesping. If the intent is to support test doubles, consider gating the silent-skip behind a clearer signal (e.g., a kwargskip_if_unavailable=False) so a misconfigured client in production doesn't bypass connectivity checks. Otherwise this is fine as-is.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/redis.py` around lines 71 - 102, The validation functions validate_async_redis_connection and validate_sync_redis_connection currently silently return when redis_client has no callable ping; change their signature to accept a flag (e.g., skip_if_unavailable: bool = False) and use it to control behavior: if ping is missing and skip_if_unavailable is True then return as before, otherwise raise a clear FQException indicating the client is missing a ping method; update any callers/tests that relied on the silent-skip to pass skip_if_unavailable=True where appropriate so production calls fail fast when a real redis client is misconfigured.
21-66: 💤 Low valueInconsistent cluster construction between async and sync paths.
The async path uses
startup_nodes=[AsyncClusterNode(...)](lines 23-25), while the sync path passeshost/portkwargs directly toSyncRedisCluster(lines 53-59). Both forms work with redis-py 7.1+, but the asymmetry makes maintenance harder. Consider unifying both to usestartup_nodes=[ClusterNode(...)]by importingClusterNodefromredis.clusteron the sync side, mirroring the async pattern. This would keep the two factories in sync as redis-py evolves.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/fq/redis.py` around lines 21 - 66, The sync client factory create_sync_redis_client currently constructs clustered clients with SyncRedisCluster(host=..., port=...) while the async factory uses startup_nodes=[AsyncClusterNode(...)]; unify them by importing ClusterNode from redis.cluster and constructing the sync cluster with startup_nodes=[ClusterNode(redis_config.host, int(redis_config.port))] (keep decode_responses/password/socket_timeout as-is) so the clustered path in create_sync_redis_client mirrors the AsyncClusterNode/AsyncRedisCluster pattern; update imports accordingly and remove the host/port kwargs when using startup_nodes.README.md (1)
69-73: 💤 Low valueMinor: add a language identifier to the
redis.confsnippet.markdownlint (MD040) flags Line 70 because the fenced block has no language. Tagging it as
conf(ortext) silences the lint and gives readers proper syntax highlighting.Proposed fix
-> ``` +> ```conf > unixsocket /var/run/redis/redis.sock > unixsocketperm 755 > ```🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@README.md` around lines 69 - 73, The fenced code block in README.md lacks a language tag causing markdownlint MD040; update the snippet by adding a language identifier (e.g., change the opening ``` to ```conf) for the redis.conf example so the block becomes a fenced "conf" code block and silences the lint and enables proper highlighting.tests/test_func.py (1)
10-10: 💤 Low valueImport
patchexplicitly for consistency with the rest of the suite.
unittest.mock.patchat line 1834 is reached only becausefrom unittest.mock import AsyncMock, MagicMock(line 10) loads the submodule into theunittestnamespace as a side effect —import unittestalone wouldn't expose it. Other test modules in this PR importpatchdirectly (e.g.,tests/test_edge_cases.pyline 8). Addpatchto the existing import.♻️ Proposed change
-from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch- with unittest.mock.patch( - "fq.redis.AsyncRedis", - side_effect=mock_redis_constructor, - ): + with patch("fq.redis.AsyncRedis", side_effect=mock_redis_constructor):Also applies to: 1834-1837
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_func.py` at line 10, Add unittest.mock.patch to the import statement that currently imports AsyncMock and MagicMock so tests explicitly import patch (e.g., update the import that references AsyncMock and MagicMock to also include patch); this keeps import style consistent with other tests and prevents relying on side-effect exposure of the submodule via unittest.tests/test_sync_queue.py (1)
249-290: 💤 Low valueMove
try/finallyto wrapinitialize()so the async queue is always cleaned up.
queue.initialize()and the firstflushdb()at lines 251–252 run before thetryblock at line 280. If either raises,queue.close()is never invoked, leaving a Redis connection dangling for the rest of the test process. The same pattern appears intest_sync_async_interoperabilityat lines 298–300 (try starts at line 302). Wrap the setup inside thetry(or move cleanup toaddAsyncCleanup) so resources are released on failure.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_sync_queue.py` around lines 249 - 290, Wrap the AsyncFQ setup in collect_async_errors so cleanup always runs: move the try/finally to start before calling AsyncFQ(...), await queue.initialize() and await queue._r.flushdb() (or register await queue.close() via addAsyncCleanup) so that the finally block always executes and calls await queue._r.flushdb() and await queue.close(); apply the same change to the similar setup in test_sync_async_interoperability. Ensure you reference the AsyncFQ instance, its initialize method, _r.flushdb and close methods (or use addAsyncCleanup) so no Redis connection is left open on initialization failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/fq/config.py`:
- Around line 198-201: _require_sections currently only ensures "redis" and
"queue" exist but allows any other keys (e.g., legacy "fq") to pass silently;
update _require_sections to compute the set of unknown sections as
set(config.keys()) - {"redis","queue"} and, if non-empty, raise FQException
listing the unexpected section names so callers fail fast on legacy/unknown
sections (keep the existing presence checks for "redis" and "queue" and include
both types of errors in the exception message).
---
Duplicate comments:
In `@src/fq/config.py`:
- Around line 52-65: The code currently lets clustered=True with
conn_type="unix_sock" silently pass, dropping clustering later; update
validation to reject that combination: in _validate_connection (or
_validate_clustered) check if config.get("clustered") is True and
config.get("conn_type") == "unix_sock" and raise an FQException with a clear
message like "Invalid config: redis.clustered cannot be true with
conn_type='unix_sock' (clustering is TCP-only)"; reference the existing methods
_validate_connection and _validate_clustered and the
create_async_redis_client/create_sync_redis_client behavior to ensure the
validation prevents the unsupported combination.
---
Nitpick comments:
In `@README.md`:
- Around line 69-73: The fenced code block in README.md lacks a language tag
causing markdownlint MD040; update the snippet by adding a language identifier
(e.g., change the opening ``` to ```conf) for the redis.conf example so the
block becomes a fenced "conf" code block and silences the lint and enables
proper highlighting.
In `@src/fq/lua.py`:
- Around line 18-26: The register method currently calls cls._read_script for
each field every time, causing repeated file I/O; decorate the _read_script
method with functools.lru_cache (or otherwise cache its results) so subsequent
calls return the cached script source instead of re-reading the .lua files; keep
using fields(cls) and redis_client.register_script as-is, just ensure
_read_script is memoized (e.g., apply `@functools.lru_cache` to
LuaScripts._read_script) so LuaScripts.register avoids repeated disk reads.
In `@src/fq/redis.py`:
- Around line 71-102: The validation functions validate_async_redis_connection
and validate_sync_redis_connection currently silently return when redis_client
has no callable ping; change their signature to accept a flag (e.g.,
skip_if_unavailable: bool = False) and use it to control behavior: if ping is
missing and skip_if_unavailable is True then return as before, otherwise raise a
clear FQException indicating the client is missing a ping method; update any
callers/tests that relied on the silent-skip to pass skip_if_unavailable=True
where appropriate so production calls fail fast when a real redis client is
misconfigured.
- Around line 21-66: The sync client factory create_sync_redis_client currently
constructs clustered clients with SyncRedisCluster(host=..., port=...) while the
async factory uses startup_nodes=[AsyncClusterNode(...)]; unify them by
importing ClusterNode from redis.cluster and constructing the sync cluster with
startup_nodes=[ClusterNode(redis_config.host, int(redis_config.port))] (keep
decode_responses/password/socket_timeout as-is) so the clustered path in
create_sync_redis_client mirrors the AsyncClusterNode/AsyncRedisCluster pattern;
update imports accordingly and remove the host/port kwargs when using
startup_nodes.
In `@src/fq/responses.py`:
- Around line 31-41: format_metrics_counts assumes enqueue_details and
dequeue_details are aligned and same-length; validate this invariant up front
and fail loudly or iterate them separately: add a check in format_metrics_counts
that len(enqueue_details) == len(dequeue_details) (and that both lengths are
even), and raise a ValueError with a clear message identifying
enqueue_details/dequeue_details if the check fails, or alternatively refactor to
loop over each list independently (two loops over enqueue_details and
dequeue_details) so misaligned Redis responses cannot silently drift; reference
the function name format_metrics_counts and the variables
enqueue_details/dequeue_details when making the change.
- Around line 13-28: The function format_dequeue_response currently unpacks
dequeue_response into four variables and will raise ValueError if the iterable
has more than four elements; change the guard and unpack to be
forward-compatible by checking len(dequeue_response) >= 4 and then assigning
queue_id, job_id, payload, requeues_remaining = dequeue_response[0],
dequeue_response[1], dequeue_response[2], dequeue_response[3] (keep the existing
payload None check, decode_redis_value and deserialize_payload calls, and
int(requeues_remaining) conversion) so extra trailing fields won’t cause a
crash.
In `@src/fq/validators.py`:
- Around line 79-92: Add brief docstrings to validate_metrics_arguments and
validate_clear_queue_arguments that explicitly state the intended asymmetry:
validate_metrics_arguments allows queue_type and queue_id to be None (optional
scoping per README) while validate_clear_queue_arguments requires both non-None
and valid identifiers; reference the README behavior in the docstrings so future
maintainers understand why the validations differ and should not be normalized.
In `@tests/test_func.py`:
- Line 10: Add unittest.mock.patch to the import statement that currently
imports AsyncMock and MagicMock so tests explicitly import patch (e.g., update
the import that references AsyncMock and MagicMock to also include patch); this
keeps import style consistent with other tests and prevents relying on
side-effect exposure of the submodule via unittest.
In `@tests/test_sync_queue.py`:
- Around line 249-290: Wrap the AsyncFQ setup in collect_async_errors so cleanup
always runs: move the try/finally to start before calling AsyncFQ(...), await
queue.initialize() and await queue._r.flushdb() (or register await queue.close()
via addAsyncCleanup) so that the finally block always executes and calls await
queue._r.flushdb() and await queue.close(); apply the same change to the similar
setup in test_sync_async_interoperability. Ensure you reference the AsyncFQ
instance, its initialize method, _r.flushdb and close methods (or use
addAsyncCleanup) so no Redis connection is left open on initialization failure.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cb159d02-7a08-43b4-b552-b7e305b6d0b4
📒 Files selected for processing (13)
README.mdsrc/fq/base.pysrc/fq/config.pysrc/fq/lua.pysrc/fq/redis.pysrc/fq/responses.pysrc/fq/utils.pysrc/fq/validators.pytests/config.pytests/test_edge_cases.pytests/test_func.pytests/test_queue.pytests/test_sync_queue.py
| @staticmethod | ||
| def _require_sections(config): | ||
| if "redis" not in config or "queue" not in config: | ||
| raise FQException("Config missing required sections: redis, queue") |
There was a problem hiding this comment.
Legacy/unknown sections pass through silently — PR claims they should be rejected.
The PR description says the config schema "rejects legacy sections" (e.g., the old "fq" name), but _require_sections only checks that redis and queue are present. A mapping like {"redis": {...}, "queue": {...}, "fq": {...}} is normalized and validated without any signal to the caller that fq is silently ignored. If strict rejection is the intended behavior, fail fast on unknown section names.
Proposed fix
+ _ALLOWED_SECTIONS = frozenset({"redis", "queue"})
+
`@staticmethod`
def _require_sections(config):
if "redis" not in config or "queue" not in config:
raise FQException("Config missing required sections: redis, queue")
+
+ unknown = set(config) - FQConfig._ALLOWED_SECTIONS
+ if unknown:
+ raise FQException(
+ "Config has unknown sections: %s" % ", ".join(sorted(unknown))
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @staticmethod | |
| def _require_sections(config): | |
| if "redis" not in config or "queue" not in config: | |
| raise FQException("Config missing required sections: redis, queue") | |
| _ALLOWED_SECTIONS = frozenset({"redis", "queue"}) | |
| `@staticmethod` | |
| def _require_sections(config): | |
| if "redis" not in config or "queue" not in config: | |
| raise FQException("Config missing required sections: redis, queue") | |
| unknown = set(config) - FQConfig._ALLOWED_SECTIONS | |
| if unknown: | |
| raise FQException( | |
| "Config has unknown sections: %s" % ", ".join(sorted(unknown)) | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/fq/config.py` around lines 198 - 201, _require_sections currently only
ensures "redis" and "queue" exist but allows any other keys (e.g., legacy "fq")
to pass silently; update _require_sections to compute the set of unknown
sections as set(config.keys()) - {"redis","queue"} and, if non-empty, raise
FQException listing the unexpected section names so callers fail fast on
legacy/unknown sections (keep the existing presence checks for "redis" and
"queue" and include both types of errors in the exception message).
This pull request introduces several new modules and configuration files to support both async and sync interfaces for the Flowdacity Queue, improves documentation, and enhances workflow security. The main focus is on modularizing configuration, key management, Lua script registration, Redis client creation, and response formatting. Additionally, the documentation is updated to clarify usage for both async and sync interfaces, and workflow files are pinned to specific action SHAs for improved security and reproducibility.
Major new modules and configuration:
src/fq/config.pyto centralize and validate configuration for both Redis and FQ, supporting both async and sync usage.src/fq/keys.pyfor consistent Redis key naming and management.src/fq/lua.pyto handle Lua script registration and loading for Redis operations.src/fq/redis.pyto create and validate both async and sync Redis clients, supporting cluster and socket modes.src/fq/responses.pyfor formatting and decoding responses from Redis, including dequeue and metrics.src/fq/sync/__init__.pyto expose the syncFQinterface.Documentation improvements:
README.mdto clarify async and sync usage, including code examples and highlighting that both interfaces are supported. [1] [2] [3]Workflow and configuration enhancements:
.qlty/qlty.tomland.qlty/.gitignorefor Qlty code quality tool configuration and file exclusions. [1] [2]persist-credentials: falsefor improved security and reproducibility. [1] [2] [3] [4]Summary by CodeRabbit
New Features
Documentation
Tests
Chores