Skip to content

sample for custom worker tuner#314

Open
deepika-awasthi wants to merge 3 commits into
mainfrom
deepika/custom-worker-tuner
Open

sample for custom worker tuner#314
deepika-awasthi wants to merge 3 commits into
mainfrom
deepika/custom-worker-tuner

Conversation

@deepika-awasthi
Copy link
Copy Markdown

@deepika-awasthi deepika-awasthi commented May 28, 2026

Custom Worker Tuner

A CustomSlotSupplier is a sample that lets you gate slot grants on whatever you want.
This sample gates on a fake DB pool: the worker only polls for a new
activity when the pool has a free connection.

What this sample is

downstream.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release)
supplier.py - The custom slot supplier. On reserve_slot it polls downstream.increment() until it succeeds. On release_slot it calls downstream.decrement()
shared.py - A RunBatch workflow that runs N do_work activities in parallel. The activity just sleeps
worker.py - Wires Downstream + DownstreamAwareSupplier into a WorkerTuner
starter.py - Drives load

The flow:

When the downstream is at capacity, reserve_slot blocks until a
slot frees up. The excess work piles up on the Temporal server, not
inside the worker.

Run

In three terminals from samples-python/:

temporal server start-dev                       # terminal 1
uv run custom_worker_tuner/worker.py            # terminal 2
uv run custom_worker_tuner/starter.py           # terminal 3

What you'll see

The worker prints one line per slot lifecycle event:


TIME          EVENT     SLOT     COUNT   DETAIL
────────────────────────────────────────────────────────────
10:31:49.842  reserve   #1      1/10  ready to poll
10:31:49.842  reserve   #2      2/10  ready to poll
10:31:49.843  reserve   #3      3/10  ready to poll
10:31:49.843  reserve   #4      4/10  ready to poll
10:31:49.843  reserve   #5      5/10  ready to poll
10:31:49.843  reserve   #6      6/10  ready to poll
10:31:56.763  reserve   #7      7/10  eager dispatch
10:31:56.763  reserve   #8      8/10  eager dispatch
10:31:56.764  reserve   #9      9/10  eager dispatch
10:31:56.766  reserve   #10    10/10  eager dispatch
10:31:56.767  release   #7      9/10  no task arrived
10:31:56.768  release   #8      8/10  no task arrived
10:31:56.768  release   #9      7/10  no task arrived
10:31:56.768  reserve   #11     8/10  eager dispatch
10:31:56.768  reserve   #12     9/10  eager dispatch
10:31:56.768  reserve   #13    10/10  eager dispatch
10:31:56.771  used      #1     10/10  activity running
10:31:56.771  release   #10     9/10  no task arrived

Under load, with more activities than capacity, COUNT pins at
10/10 — that's the supplier refusing to poll past the gate.
we chose 10 because default there are 5 pollers for python sdk

Knobs

worker.py:

CAPACITY — downstream capacity (the gate)
POLL_INTERVAL_MS — how often the supplier rechecks when full

starter.py:

WORKFLOWS, ACTIVITIES_PER_WORKFLOW, SECONDS_PER_ACTIVITY — amount and duration of load

@deepika-awasthi deepika-awasthi requested a review from a team as a code owner May 28, 2026 18:15
@deepika-awasthi deepika-awasthi force-pushed the deepika/custom-worker-tuner branch 2 times, most recently from c3f6d55 to 33cec4d Compare May 28, 2026 18:31
@deepika-awasthi deepika-awasthi force-pushed the deepika/custom-worker-tuner branch from 33cec4d to 2c05e40 Compare May 28, 2026 18:42
Comment thread custom_worker_tuner/downstream.py Outdated
logger = logging.getLogger(__name__)


class Downstream:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downstream is an odd name for something that is essentially a Semaphore.

Maybe FakeDatabaseConnectionPool would be more descriptive

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to FakeDatabaseConnectionPool (and moved to db_pool.py)

Comment thread custom_worker_tuner/downstream.py Outdated
self.allowed_connections = allowed_connections
self.name = name
self.currently_connected = 0
self.connection_pool = threading.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - the class is now just a thin wrapper over threading.BoundedSemaphore. The counter and lock are gone

in_use is derived from the semaphore (allowed_connections - self._sem._value)

Comment thread custom_worker_tuner/supplier.py Outdated
Comment on lines +43 to +44
while not self.downstream.increment():
await asyncio.sleep(self.poll_interval_ms / 1000.0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spin-looping here isn't great. Per my other suggestion if the fake pool just actually uses a semaphore the call itself can be blocking and no need for a sleep

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed -

reserve_slot now does await asyncio.to_thread(self.connection_pool.acquire), so it blocks until a slot is free instead of polling.

poll_interval_ms is removed.

@deepika-awasthi deepika-awasthi force-pushed the deepika/custom-worker-tuner branch from 3569839 to cc615bf Compare May 29, 2026 21:57
def __init__(self, allowed_connections: int, name: str = "db") -> None:
self.allowed_connections = allowed_connections
self.name = name
self._connection_pool = threading.BoundedSemaphore(allowed_connections)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want to use asyncio.Semaphore... that's why you have do do the nasty asyncio.to_thread thing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants