Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
with:
fetch-depth: 0
persist-credentials: false

- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
with:
python-version: "3.12"

Expand All @@ -33,4 +34,4 @@ jobs:
python -m build

- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
uses: pypa/gh-action-pypi-publish@cef221092ed1bacb1cc03d23a2d87d1d172e277b # release/v1
8 changes: 5 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
with:
persist-credentials: false

- name: Install uv (with Python 3.12)
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@e4db8464a088ece1b920f60402e813ea4de65b8f # v4
with:
python-version: "3.12"

Expand All @@ -45,7 +47,7 @@ jobs:
uv run pytest --cov=src --cov-branch --cov-report=xml

- name: Upload results to Codecov
uses: codecov/codecov-action@v5
uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe # v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: flowdacity/queue-engine
7 changes: 7 additions & 0 deletions .qlty/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*
!configs
!configs/**
!hooks
!hooks/**
!qlty.toml
!.gitignore
88 changes: 88 additions & 0 deletions .qlty/qlty.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# This file was automatically generated by `qlty init`.
# You can modify it to suit your needs.
# We recommend you to commit this file to your repository.
#
# This configuration is used by both Qlty CLI and Qlty Cloud.
#
# Qlty CLI -- Code quality toolkit for developers
# Qlty Cloud -- Fully automated Code Health Platform
#
# Try Qlty Cloud: https://qlty.sh
#
# For a guide to configuration, visit https://qlty.sh/d/config
# Or for a full reference, visit https://qlty.sh/d/qlty-toml
config_version = "0"

exclude_patterns = [
"*_min.*",
"*-min.*",
"*.min.*",
"**/.yarn/**",
"**/*.d.ts",
"**/assets/**",
"**/bower_components/**",
"**/build/**",
"**/cache/**",
"**/config/**",
"**/db/**",
"**/deps/**",
"**/dist/**",
"**/extern/**",
"**/external/**",
"**/generated/**",
"**/Godeps/**",
"**/gradlew/**",
"**/mvnw/**",
"**/node_modules/**",
"**/protos/**",
"**/seed/**",
"**/target/**",
"**/templates/**",
"**/testdata/**",
"**/vendor/**",
]

test_patterns = [
"**/test/**",
"**/spec/**",
"**/*.test.*",
"**/*.spec.*",
"**/*_test.*",
"**/*_spec.*",
"**/test_*.*",
"**/spec_*.*",
]

[smells]
mode = "comment"

[[source]]
name = "default"
default = true


[[plugin]]
name = "actionlint"

[[plugin]]
name = "bandit"

[[plugin]]
name = "radarlint-python"
mode = "comment"

[[plugin]]
name = "ripgrep"
mode = "comment"

[[plugin]]
name = "ruff"
drivers = [
"lint",
]

[[plugin]]
name = "trufflehog"

[[plugin]]
name = "zizmor"
92 changes: 80 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
Flowdacity Queue
================

Flowdacity Queue (FQ) is an asyncio-friendly, rate-limited job queue built on Redis. It stores jobs per queue type and queue id, enforces per-queue dequeue intervals, automatically requeues expired jobs, and exposes metrics to understand throughput and queue depth.
Flowdacity Queue (FQ) is a rate-limited job queue built on Redis. It stores jobs per queue type and queue id, enforces per-queue dequeue intervals, automatically requeues expired jobs, and exposes metrics to understand throughput and queue depth.

## Features

- Per-queue rate limiting using millisecond intervals.
- Async Redis client with Lua scripts for predictable behavior.
- Async and sync interfaces backed by Redis clients from redis-py.
- Lua scripts for predictable queue behavior.
- Automatic retries with configurable limits (including infinite retries).
- Metrics for enqueue/dequeue counts and queue lengths.
- Works with TCP or Unix socket Redis deployments and supports Redis Cluster.
Expand All @@ -36,31 +37,48 @@ pip install -e .
FQ accepts a simple config mapping. Intervals are in milliseconds.
```python
config = {
"fq": {
"queue": {
"key_prefix": "queue_server",
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1, # -1 retries forever, 0 means no retries
},
"redis": {
"db": 0,
"key_prefix": "queue_server",
"conn_type": "tcp_sock", # or "unix_sock"
"conn_type": "tcp_sock",
"host": "127.0.0.1",
"port": 6379,
"password": "",
"clustered": False,
"unix_socket_path": "/tmp/redis.sock",
},
}
```

> If you connect via Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`:
For Unix socket connections, use `conn_type: "unix_sock"` and provide
`unix_socket_path`:
```python
"redis": {
"db": 0,
"conn_type": "unix_sock",
"unix_socket_path": "/tmp/redis.sock",
"password": "",
"clustered": False,
}
```

> If you use Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`:
> ```
> unixsocket /var/run/redis/redis.sock
> unixsocketperm 755
> ```

## Quickstart
## Async Usage

Import `FQ` from the top-level package:

```python
from fq import FQ
```

```python
import asyncio
Expand All @@ -70,20 +88,19 @@ from fq import FQ

async def main():
config = {
"fq": {
"queue": {
"key_prefix": "queue_server",
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1,
},
"redis": {
"db": 0,
"key_prefix": "queue_server",
"conn_type": "tcp_sock",
"host": "127.0.0.1",
"port": 6379,
"password": "",
"clustered": False,
"unix_socket_path": "/tmp/redis.sock",
},
}

Expand Down Expand Up @@ -114,13 +131,64 @@ async def main():
asyncio.run(main())
```

Common operations:
## Sync Usage

Import `FQ` from `fq.sync`:

```python
import uuid
from fq.sync import FQ


config = {
"queue": {
"key_prefix": "queue_server",
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1,
},
"redis": {
"db": 0,
"conn_type": "tcp_sock",
"host": "127.0.0.1",
"port": 6379,
"password": "",
"clustered": False,
},
}

fq = FQ(config)
fq.initialize()

job_id = str(uuid.uuid4())
fq.enqueue(
payload={"message": "hello, world"},
interval=1000,
job_id=job_id,
queue_id="user001",
queue_type="sms",
)

job = fq.dequeue(queue_type="sms")
if job["status"] == "success":
fq.finish(
queue_type="sms",
queue_id=job["queue_id"],
job_id=job["job_id"],
)

fq.close()
```

## Common Operations

- `await fq.requeue()` — move expired jobs back onto their queues.
- `await fq.interval(interval=5000, queue_id="user001", queue_type="sms")` — change a queue’s rate limit on the fly.
- `await fq.metrics()` — global metrics; pass `queue_type` and/or `queue_id` for scoped stats and queue length.
- `await fq.clear_queue(queue_type="sms", queue_id="user001", purge_all=True)` — drop queued jobs and their payload/interval metadata.

The same operations are available from `fq.sync.FQ` without `await`.

## Development

- Start Redis for local development: `make redis-up` (binds to `localhost:6379`).
Expand Down
Loading
Loading