sample for custom worker tuner#314
Conversation
c3f6d55 to
33cec4d
Compare
33cec4d to
2c05e40
Compare
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Downstream: |
There was a problem hiding this comment.
Downstream is an odd name for something that is essentially a Semaphore.
Maybe FakeDatabaseConnectionPool would be more descriptive
There was a problem hiding this comment.
Renamed to FakeDatabaseConnectionPool (and moved to db_pool.py)
| self.allowed_connections = allowed_connections | ||
| self.name = name | ||
| self.currently_connected = 0 | ||
| self.connection_pool = threading.Lock() |
There was a problem hiding this comment.
This could in fact literally be a https://docs.python.org/3/library/threading.html#threading.Semaphore
There was a problem hiding this comment.
Done - the class is now just a thin wrapper over threading.BoundedSemaphore. The counter and lock are gone
in_use is derived from the semaphore (allowed_connections - self._sem._value)
| while not self.downstream.increment(): | ||
| await asyncio.sleep(self.poll_interval_ms / 1000.0) |
There was a problem hiding this comment.
Spin-looping here isn't great. Per my other suggestion if the fake pool just actually uses a semaphore the call itself can be blocking and no need for a sleep
There was a problem hiding this comment.
fixed -
reserve_slot now does await asyncio.to_thread(self.connection_pool.acquire), so it blocks until a slot is free instead of polling.
poll_interval_ms is removed.
3569839 to
cc615bf
Compare
| def __init__(self, allowed_connections: int, name: str = "db") -> None: | ||
| self.allowed_connections = allowed_connections | ||
| self.name = name | ||
| self._connection_pool = threading.BoundedSemaphore(allowed_connections) |
There was a problem hiding this comment.
You want to use asyncio.Semaphore... that's why you have do do the nasty asyncio.to_thread thing
Custom Worker Tuner
A
CustomSlotSupplieris a sample that lets you gate slot grants on whatever you want.This sample gates on a fake DB pool: the worker only polls for a new
activity when the pool has a free connection.
What this sample is
downstream.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release)
supplier.py - The custom slot supplier. On reserve_slot it polls downstream.increment() until it succeeds. On release_slot it calls downstream.decrement()
shared.py - A RunBatch workflow that runs N do_work activities in parallel. The activity just sleeps
worker.py - Wires Downstream + DownstreamAwareSupplier into a WorkerTuner
starter.py - Drives load
The flow:
When the downstream is at capacity,
reserve_slotblocks until aslot frees up. The excess work piles up on the Temporal server, not
inside the worker.
Run
In three terminals from
samples-python/:What you'll see
The worker prints one line per slot lifecycle event:
Under load, with more activities than capacity, COUNT pins at
10/10 — that's the supplier refusing to poll past the gate.
we chose 10 because default there are 5 pollers for python sdk
Knobs
worker.py:
CAPACITY — downstream capacity (the gate)
POLL_INTERVAL_MS — how often the supplier rechecks when full
starter.py:
WORKFLOWS, ACTIVITIES_PER_WORKFLOW, SECONDS_PER_ACTIVITY — amount and duration of load