The Conductor Python SDK supports hierarchical worker configuration, allowing you to override worker settings at deployment time using environment variables without changing code.
Worker properties are resolved using a three-tier hierarchy (from lowest to highest priority):
- Code-level defaults (lowest priority) - Values defined in
@worker_taskdecorator - Global worker config (medium priority) -
conductor.worker.all.<property>environment variables - Worker-specific config (highest priority) -
conductor.worker.<worker_name>.<property>environment variables
This means:
- Worker-specific environment variables override everything
- Global environment variables override code defaults
- Code defaults are used when no environment variables are set
The following properties can be configured via environment variables:
| Property | Type | Description | Example | Decorator? |
|---|---|---|---|---|
poll_interval_millis |
int | Polling interval in milliseconds | 1000 |
✅ Yes |
domain |
string | Worker domain for task routing | production |
✅ Yes |
worker_id |
string | Unique worker identifier | worker-1 |
✅ Yes |
thread_count |
int | Max concurrent executions (threads for sync, coroutines for async) | 10 |
✅ Yes |
register_task_def |
bool | Auto-register task definition with JSON schemas on startup | true |
✅ Yes |
overwrite_task_def |
bool | Overwrite existing task definitions when registering (default: true) | false |
✅ Yes |
strict_schema |
bool | Enforce strict schema validation - additionalProperties=false (default: false) | true |
✅ Yes |
poll_timeout |
int | Poll request timeout in milliseconds | 100 |
✅ Yes |
lease_extend_enabled |
bool | Auto-extend task lease via heartbeat (see below) | false |
✅ Yes |
paused |
bool | Pause worker from polling/executing tasks | true |
❌ Environment-only |
Notes:
- The
pausedproperty is intentionally not available in the@worker_taskdecorator. It can only be controlled via environment variables, allowing operators to pause/resume workers at runtime without code changes or redeployment. - When
lease_extend_enabled=True, the SDK automatically sends heartbeats at 80% ofresponseTimeoutSecondsto keep long-running tasks alive. Without it, tasks that exceedresponseTimeoutSecondsare timed out and retried by the server. - The
register_task_defparameter automatically registers task definitions with JSON Schema (draft-07) generated from Python type hints. - The
overwrite_task_defparameter controls whether to overwrite existing task definitions (default: true). - The
strict_schemaparameter controls JSON schema validation strictness (default: false for lenient validation).
The thread_count parameter has different meanings depending on worker type (automatically detected from function signature):
Sync Workers (def):
- Controls ThreadPoolExecutor size
- Each task consumes one thread
- Recommended: 1-4 for CPU-bound, 10-50 for I/O-bound
Async Workers (async def):
- Controls max concurrent async tasks (semaphore limit)
- All tasks share single event loop
- Recommended: 50-200 for I/O-bound (event loop handles thousands)
Example:
# Sync worker - thread_count = thread pool size
@worker_task(task_definition_name='cpu_task', thread_count=4)
def cpu_task(data: dict) -> dict:
return expensive_computation(data)
# Async worker - thread_count = concurrency limit (not threads!)
@worker_task(task_definition_name='api_task', thread_count=100)
async def api_task(url: str) -> dict:
async with httpx.AsyncClient() as client:
return await client.get(url)
# Only 1 thread, but 100 concurrent tasks!For more details, see Worker Design Documentation.
Current Implementation: Only manual lease extension via TaskInProgress is supported.
from conductor.client.context.task_context import TaskInProgress, get_task_context
from typing import Union
@worker_task(task_definition_name='long_running_task')
def long_task(job_id: str) -> Union[dict, TaskInProgress]:
ctx = get_task_context()
poll_count = ctx.get_poll_count()
# Process chunk of work
processed = process_chunk(job_id, poll_count)
if not is_complete(job_id):
# More work to do - extend lease by returning TaskInProgress
return TaskInProgress(
callback_after_seconds=60, # Return to queue after 60s
output={'progress': processed}
)
else:
# Done - return final result
return {'status': 'completed', 'result': processed}Automatic lease extension: When lease_extend_enabled=True, the SDK sends a heartbeat to the server at 80% of responseTimeoutSeconds, resetting the timeout clock. This keeps the task alive without requiring manual TaskInProgress returns. The TaskInProgress pattern is still useful for chunked/checkpoint-based execution where the worker yields the task back to the queue.
For detailed patterns, see Long-Running Tasks & Lease Extension.
Controls whether to overwrite existing task definitions when register_task_def=True:
Overwrite Mode (default, overwrite_task_def=true):
- Always calls
update_task_def()to overwrite existing definitions - Ensures server always has latest configuration from code
- Use when: Task configuration changes frequently, development environments
No-Overwrite Mode (overwrite_task_def=false):
- Checks if task exists before registering
- Only creates new task if it doesn't exist
- Preserves manual changes made on server
- Use when: Tasks managed outside code, production with manual config
# Global: Never overwrite any task definitions (Unix format - recommended)
export CONDUCTOR_WORKER_ALL_OVERWRITE_TASK_DEF=false
# Specific: Allow overwrite for this worker only (Unix format - recommended)
export CONDUCTOR_WORKER_DYNAMIC_TASK_OVERWRITE_TASK_DEF=true
# Alternative: Dot notation (also works)
export conductor.worker.all.overwrite_task_def=false
export conductor.worker.dynamic_task.overwrite_task_def=trueControls JSON Schema validation strictness when register_task_def=True:
Lenient Mode (default, strict_schema=false):
- Sets
additionalProperties=truein schemas - Allows extra fields beyond defined schema
- Use when: Backward compatibility, flexible integrations, development
Strict Mode (strict_schema=true):
- Sets
additionalProperties=falsein schemas - Rejects inputs with extra fields
- Use when: Strict contract enforcement, production validation
# Global: Strict validation for all workers (Unix format - recommended)
export CONDUCTOR_WORKER_ALL_STRICT_SCHEMA=true
# Specific: Lenient for this worker (overrides global, Unix format - recommended)
export CONDUCTOR_WORKER_FLEXIBLE_TASK_STRICT_SCHEMA=false
# Alternative: Dot notation (also works)
export conductor.worker.all.strict_schema=true
export conductor.worker.flexible_task.strict_schema=falseExample Schemas:
// strict_schema=false (default)
{
"type": "object",
"properties": {"name": {"type": "string"}},
"additionalProperties": true // ← Extra fields allowed
}
// strict_schema=true
{
"type": "object",
"properties": {"name": {"type": "string"}},
"additionalProperties": false // ← Extra fields rejected
}conductor.worker.all.<property>=<value>conductor.worker.<task_definition_name>.<property>=<value>from conductor.client.worker.worker_task import worker_task
@worker_task(
task_definition_name='process_order',
poll_interval_millis=1000,
domain='dev',
thread_count=5
)
def process_order(order_id: str) -> dict:
return {'status': 'processed', 'order_id': order_id}Worker uses code-level defaults:
poll_interval_millis=1000domain='dev'thread_count=5
export conductor.worker.all.poll_interval_millis=500
export conductor.worker.all.domain=productionWorker now uses:
poll_interval_millis=500(from global env)domain='production'(from global env)thread_count=5(from code)
export conductor.worker.all.poll_interval_millis=500
export conductor.worker.all.domain=production
export conductor.worker.process_order.thread_count=20Worker now uses:
poll_interval_millis=500(from global env)domain='production'(from global env)thread_count=20(from worker-specific env)
Override all workers to use production domain and optimized settings:
# Global production settings
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=250
# Critical worker needs more resources
export conductor.worker.process_payment.thread_count=50
export conductor.worker.process_payment.poll_interval_millis=50# Code remains unchanged
@worker_task(task_definition_name='process_order', poll_interval_millis=1000, domain='dev', thread_count=5)
def process_order(order_id: str):
...
@worker_task(task_definition_name='process_payment', poll_interval_millis=1000, domain='dev', thread_count=5)
def process_payment(payment_id: str):
...Result:
process_order: domain=production, poll_interval_millis=250, thread_count=5process_payment: domain=production, poll_interval_millis=50, thread_count=50
Slow down polling for easier debugging:
export conductor.worker.all.poll_interval_millis=10000 # 10 seconds
export conductor.worker.all.thread_count=1 # Single concurrent task
export conductor.worker.all.poll_timeout=5000 # 5 second timeoutAll workers will use these debug-friendly settings without code changes.
Override only domain while keeping code defaults for other properties:
export conductor.worker.all.domain=stagingAll workers use staging domain, but keep their code-defined poll intervals, thread counts, etc.
For async I/O-bound workers, increase concurrency significantly:
# Global settings for async workers
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=100 # Lower polling delay for async
# Async worker - high concurrency (event loop can handle it!)
export conductor.worker.fetch_api_data.thread_count=200
# Sync worker - keep moderate thread count
export conductor.worker.process_cpu_task.thread_count=10# Async worker - high concurrency with single event loop
@worker_task(task_definition_name='fetch_api_data')
async def fetch_api_data(url: str):
async with httpx.AsyncClient() as client:
return await client.get(url)
# Sync worker - traditional thread pool
@worker_task(task_definition_name='process_cpu_task')
def process_cpu_task(data: dict):
return expensive_computation(data)Result:
fetch_api_data: 200 concurrent async tasks in 1 thread!process_cpu_task: 10 threads for CPU-bound work
Temporarily disable workers without stopping the process:
# Pause all workers (maintenance mode)
export conductor.worker.all.paused=true
# Pause specific worker only
export conductor.worker.process_order.paused=trueWhen a worker is paused:
- It stops polling for new tasks
- Already-executing tasks complete normally
- The
task_paused_totalmetric is incremented for each skipped poll - No code changes or process restarts required
Use cases:
- Maintenance: Pause workers during database migrations or system maintenance
- Debugging: Pause problematic workers while investigating issues
- Gradual rollout: Pause old workers while testing new deployment
- Resource management: Temporarily reduce load by pausing non-critical workers
Unpause workers by removing or setting the variable to false:
unset conductor.worker.all.paused
# or
export conductor.worker.all.paused=falseMonitor paused workers using the task_paused_total metric:
# Check how many times workers were paused
task_paused_total{taskType="process_order"}
Route different workers to different regions using domains:
# US workers
export conductor.worker.us_process_order.domain=us-east
export conductor.worker.us_process_payment.domain=us-east
# EU workers
export conductor.worker.eu_process_order.domain=eu-west
export conductor.worker.eu_process_payment.domain=eu-westTest new configuration on one worker before rolling out to all:
# Production settings for all workers
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=200
# Canary worker uses staging domain for testing
export conductor.worker.canary_worker.domain=stagingBoolean properties accept multiple formats:
True values: true, 1, yes
False values: false, 0, no
export conductor.worker.all.lease_extend_enabled=true
export conductor.worker.critical_task.register_task_def=1
export conductor.worker.background_task.lease_extend_enabled=false
export conductor.worker.maintenance_task.paused=trueservices:
worker:
image: my-conductor-worker
environment:
- conductor.worker.all.domain=production
- conductor.worker.all.poll_interval_millis=250
- conductor.worker.critical_task.thread_count=50apiVersion: v1
kind: ConfigMap
metadata:
name: worker-config
data:
conductor.worker.all.domain: "production"
conductor.worker.all.poll_interval_millis: "250"
conductor.worker.critical_task.thread_count: "50"
---
apiVersion: v1
kind: Pod
metadata:
name: conductor-worker
spec:
containers:
- name: worker
image: my-conductor-worker
envFrom:
- configMapRef:
name: worker-configapiVersion: apps/v1
kind: Deployment
metadata:
name: conductor-worker-prod
namespace: production
spec:
template:
spec:
containers:
- name: worker
image: my-conductor-worker
env:
- name: conductor.worker.all.domain
value: "production"
- name: conductor.worker.all.poll_interval_millis
value: "250"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: conductor-worker-staging
namespace: staging
spec:
template:
spec:
containers:
- name: worker
image: my-conductor-worker
env:
- name: conductor.worker.all.domain
value: "staging"
- name: conductor.worker.all.poll_interval_millis
value: "500"You can also use the configuration resolver programmatically:
from conductor.client.worker.worker_config import resolve_worker_config, get_worker_config_summary
# Resolve configuration for a worker
config = resolve_worker_config(
worker_name='process_order',
poll_interval_millis=1000,
domain='dev',
thread_count=5
)
print(config)
# {'poll_interval_millis': 500, 'domain': 'production', 'thread_count': 5, ...}
# Get human-readable summary
summary = get_worker_config_summary('process_order', config)
print(summary)
# Worker 'process_order' configuration:
# poll_interval_millis: 500 (from conductor.worker.all.poll_interval_millis)
# domain: production (from conductor.worker.all.domain)
# thread_count: 5 (from code)# Good: Set domain for entire environment
export conductor.worker.all.domain=production
# Less ideal: Set for each worker individually
export conductor.worker.worker1.domain=production
export conductor.worker.worker2.domain=production
export conductor.worker.worker3.domain=production# Global settings for most workers
export conductor.worker.all.thread_count=10
export conductor.worker.all.poll_interval_millis=250
# Exception: High-priority worker needs more resources
export conductor.worker.critical_task.thread_count=50
export conductor.worker.critical_task.poll_interval_millis=50Use sensible defaults in code so workers work without environment variables:
@worker_task(
task_definition_name='process_order',
poll_interval_millis=1000, # Reasonable default (1 second)
domain='dev', # Safe default domain
thread_count=5 # Moderate concurrency
)
def process_order(order_id: str):
...Maintain a README or wiki documenting required environment variables for each deployment:
# Production Environment Variables
## Required
- `conductor.worker.all.domain=production`
## Optional (Recommended)
- `conductor.worker.all.poll_interval_millis=250`
- `conductor.worker.all.thread_count=20`
## Worker-Specific Overrides
- `conductor.worker.critical_task.thread_count=50`
- `conductor.worker.critical_task.poll_interval_millis=50`Manage environment variables through IaC tools:
# Terraform example
resource "kubernetes_deployment" "worker" {
spec {
template {
spec {
container {
env {
name = "conductor.worker.all.domain"
value = var.environment_name
}
env {
name = "conductor.worker.all.poll_interval_millis"
value = var.worker_poll_interval_millis
}
env {
name = "conductor.worker.all.thread_count"
value = var.worker_thread_count
}
}
}
}
}
}Problem: Environment variables don't seem to take effect
Solutions:
-
Check environment variable names are correctly formatted:
- Global:
conductor.worker.all.<property> - Worker-specific:
conductor.worker.<exact_task_name>.<property>
- Global:
-
Verify the task definition name matches exactly:
@worker_task(task_definition_name='process_order') # Use this name in env varexport conductor.worker.process_order.domain=production # Must match exactly- Check environment variables are exported and visible:
env | grep conductor.workerProblem: Boolean properties not behaving as expected
Solution: Use recognized boolean values:
# Correct
export conductor.worker.all.lease_extend_enabled=true
export conductor.worker.all.register_task_def=false
# Incorrect
export conductor.worker.all.lease_extend_enabled=True # Case matters
export conductor.worker.all.register_task_def=0 # Use 'false' insteadProblem: Integer properties cause errors
Solution: Ensure values are valid integers without quotes in code:
# Correct
export conductor.worker.all.thread_count=10
export conductor.worker.all.poll_interval=500
# Incorrect (in most shells, but varies)
export conductor.worker.all.thread_count="10"The hierarchical worker configuration system provides flexibility to:
- Deploy once, configure anywhere: Same code works in dev/staging/prod
- Override at runtime: No code changes needed for environment-specific settings
- Fine-tune per worker: Optimize critical workers without affecting others
- Simplify management: Use global settings for common configurations
- Pause/resume at runtime: Control worker execution without redeployment
Configuration priority: Worker-specific > Global > Code defaults
Sync Workers (CPU-bound):
export conductor.worker.cpu_task.thread_count=4 # Thread pool size
export conductor.worker.cpu_task.poll_interval_millis=500 # Moderate pollingAsync Workers (I/O-bound):
export conductor.worker.api_task.thread_count=100 # High concurrency
export conductor.worker.api_task.poll_interval_millis=100 # Fast pollingLong-Running Tasks:
# Note: Use TaskInProgress for lease extension (lease_extend_enabled not implemented)
export conductor.worker.ml_training.thread_count=2 # Limit concurrent long tasks
export conductor.worker.ml_training.poll_interval_millis=500-
Worker Design Documentation - Complete worker architecture guide
- AsyncTaskRunner vs TaskRunner
- Automatic runner selection (
defvsasync def) - Performance comparison and best practices
- Worker discovery and metrics
-
Examples - Working examples with configuration
examples/worker_configuration_example.py- Hierarchical configuration demoexamples/workers_e2e.py- End-to-end exampleexamples/asyncio_workers.py- Mixed sync/async workers
Last Updated: 2025-11-28 SDK Version: 1.3.0+