Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/temporal-direct/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async def get_population_activity(city: str) -> int:

@dataclasses.dataclass
class LLMParams:
model_id: str
messages: list[dict[str, Any]]
tool_schemas: list[dict[str, Any]]

Expand Down Expand Up @@ -148,6 +149,7 @@ async def loop(
result = await temporalio.workflow.execute_activity(
llm_call_activity,
LLMParams(
model_id=context.model.id,
messages=[m.model_dump() for m in context.messages],
tool_schemas=tool_schemas,
),
Expand Down
35 changes: 20 additions & 15 deletions examples/temporal-direct/test_durability.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def read_activity_log(log_file: pathlib.Path) -> Counter[str]:

async def test_happy_path(
client: temporalio.client.Client, log_file: pathlib.Path
) -> str:
) -> tuple[str, int]:
print("\n── test_happy_path ────────────────────────────────")
log_file.write_text("")

Expand All @@ -96,9 +96,10 @@ async def test_happy_path(
assert (
"8,336,817" in result or "8336817" in result
), f"expected NYC population in result, got: {result!r}"
counts = read_activity_log(log_file)
print(f" ✓ workflow {wid} produced {len(result)} chars")
print(f" ✓ activity calls: {dict(read_activity_log(log_file))}")
return wid
print(f" ✓ activity calls: {dict(counts)}")
return wid, counts.total()


# ── Test 2: replay determinism ───────────────────────────────────
Expand All @@ -125,6 +126,7 @@ async def test_activity_caching(
env: temporalio.testing.WorkflowEnvironment,
client: temporalio.client.Client,
log_file: pathlib.Path,
baseline_total: int,
) -> None:
print("\n── test_activity_caching ──────────────────────────")
log_file.write_text("")
Expand Down Expand Up @@ -194,21 +196,24 @@ async def test_activity_caching(
# Sanity: we actually killed worker1 mid-workflow. If worker1 had
# finished everything before the SIGINT landed, the test would
# vacuously "pass" the cache invariant without exercising resume.
assert total_post > total_pre, (
assert total_pre < baseline_total, (
f"worker1 finished the entire workflow before shutdown landed "
f"(pre={total_pre}, post={total_post}); test isn't exercising resume"
f"(pre={total_pre}, baseline={baseline_total}); not exercising resume"
)

# If worker2 ignored history and re-ran everything, total_post would
# be roughly 2x total_pre (worker1's executions + worker2 redoing
# them all). Catch that case loudly.
expected_double_run = total_pre * 2
assert total_post < expected_double_run, (
f"suspiciously high activity count after resume: {total_post} "
f"(would expect at most ~{expected_double_run - 1} if cache replayed)"
# Cache invariant: post-restart total equals one full workflow's
# worth of completions (from the happy-path baseline). If worker2
# had ignored history and re-run cached activities, total_post
# would exceed baseline_total.
assert total_post == baseline_total, (
f"unexpected activity count after resume: {total_post} "
f"(baseline is {baseline_total}); worker2 may have re-run "
f"cached activities"
)
print(" ✓ resume completed without re-running cached activities")
print(f" (total before: {total_pre}, after: {total_post})")
print(
f" (pre={total_pre}, post={total_post}, baseline={baseline_total})"
)


# ── Entry point ──────────────────────────────────────────────────
Expand Down Expand Up @@ -246,9 +251,9 @@ async def main() -> None:
):
client = env.client

wid = await test_happy_path(client, log_file)
wid, baseline_total = await test_happy_path(client, log_file)
await test_replay_determinism(client, wid)
await test_activity_caching(env, client, log_file)
await test_activity_caching(env, client, log_file, baseline_total)

print("\nAll durability checks passed.")

Expand Down
Loading
Loading