Skip to content

Commit 6f1f356

Browse files
committed
test(procrastinate): integration tests against real Postgres
1 parent c408b7a commit 6f1f356

1 file changed

Lines changed: 107 additions & 0 deletions

File tree

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Integration tests for the Procrastinate integration.
2+
3+
Requires a running Postgres instance reachable via the ``PROCRASTINATE_DSN``
4+
env var (e.g. ``postgresql://postgres:postgres@localhost:5432/procrastinate``)
5+
and valid TaskBadger creds in ``TASKBADGER_*``.
6+
7+
These tests are excluded from the default pytest run via ``norecursedirs`` in
8+
pyproject.toml.
9+
"""
10+
11+
import logging
12+
import os
13+
import random
14+
15+
import procrastinate
16+
import pytest
17+
18+
import taskbadger
19+
from taskbadger import StatusEnum
20+
from taskbadger.procrastinate import current_task, track
21+
from taskbadger.systems.procrastinate import ProcrastinateSystemIntegration
22+
23+
PROCRASTINATE_DSN = os.environ.get(
24+
"PROCRASTINATE_DSN",
25+
"postgresql://postgres:postgres@localhost:5432/procrastinate",
26+
)
27+
28+
29+
@pytest.fixture(autouse=True)
30+
def _check_log_errors(caplog):
31+
yield
32+
for when in ("call", "setup", "teardown"):
33+
errors = [r.getMessage() for r in caplog.get_records(when) if r.levelno == logging.ERROR]
34+
if errors:
35+
pytest.fail(f"log errors during '{when}': {errors}")
36+
37+
38+
@pytest.fixture(scope="session")
39+
def app():
40+
"""A Procrastinate app pointed at a real Postgres instance with its schema applied."""
41+
conn = procrastinate.SyncPsycopgConnector(conninfo=PROCRASTINATE_DSN)
42+
app = procrastinate.App(connector=conn)
43+
with app.open():
44+
# Apply schema (idempotent — Procrastinate's apply_schema is safe to re-run).
45+
app.schema_manager.apply_schema()
46+
yield app
47+
48+
49+
def _fetch_job_args(app, job_id):
50+
"""Read the stored ``args`` JSONB for a Procrastinate job."""
51+
with app.connector.pool.connection() as conn:
52+
with conn.cursor() as cur:
53+
cur.execute("SELECT args FROM procrastinate_jobs WHERE id = %s", (job_id,))
54+
row = cur.fetchone()
55+
return row[0]
56+
57+
58+
def test_track_decorator(app):
59+
@track
60+
@app.task(name="add_manual", queue="taskbadger_int")
61+
def add_manual(a, b):
62+
tb = current_task()
63+
assert tb is not None
64+
tb.update(value=100, data={"result": a + b})
65+
return a + b
66+
67+
a, b = random.randint(1, 1000), random.randint(1, 1000)
68+
job_id = add_manual.defer(a=a, b=b)
69+
app.run_worker(
70+
queues=["taskbadger_int"],
71+
wait=False,
72+
install_signal_handlers=False,
73+
listen_notify=False,
74+
)
75+
76+
# The TB task id was stashed in the job kwargs at defer time. Read it back
77+
# from Procrastinate to verify the final state.
78+
args = _fetch_job_args(app, job_id)
79+
tb_id = args["__taskbadger_task_id__"]
80+
81+
fetched = taskbadger.get_task(tb_id)
82+
assert fetched.status == StatusEnum.SUCCESS
83+
assert fetched.value == 100
84+
assert fetched.data == {"result": a + b}
85+
86+
87+
def test_auto_track_via_system(app):
88+
ProcrastinateSystemIntegration(app=app, auto_track_tasks=True)
89+
90+
@app.task(name="add_auto", queue="taskbadger_int_auto")
91+
def add_auto(a, b):
92+
return a + b
93+
94+
a, b = random.randint(1, 1000), random.randint(1, 1000)
95+
job_id = add_auto.defer(a=a, b=b)
96+
app.run_worker(
97+
queues=["taskbadger_int_auto"],
98+
wait=False,
99+
install_signal_handlers=False,
100+
listen_notify=False,
101+
)
102+
103+
args = _fetch_job_args(app, job_id)
104+
tb_id = args["__taskbadger_task_id__"]
105+
106+
fetched = taskbadger.get_task(tb_id)
107+
assert fetched.status == StatusEnum.SUCCESS

0 commit comments

Comments
 (0)