Skip to content

Commit f837c8a

Browse files
committed
Various fixes to make run with sample & cleanup
1 parent c2e3115 commit f837c8a

6 files changed

Lines changed: 107 additions & 130 deletions

File tree

temporalio/contrib/aws/lambda_worker/__init__.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""A wrapper for running Temporal workers inside AWS Lambda.
22
3-
A single :py:func:`run_worker` call handles the full per-invocation lifecycle:
4-
connecting to the Temporal server, creating a worker with Lambda-tuned defaults,
5-
polling for tasks, and gracefully shutting down before the invocation deadline.
3+
A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the
4+
Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully
5+
shutting down before the invocation deadline.
66
77
Quick start::
88
@@ -14,7 +14,7 @@ def configure(config: LambdaWorkerConfig) -> None:
1414
config.worker_config["workflows"] = [MyWorkflow]
1515
config.worker_config["activities"] = [my_activity]
1616
17-
handler = run_worker(
17+
lambda_handler = run_worker(
1818
WorkerDeploymentVersion(
1919
deployment_name="my-service",
2020
build_id="v1.0",
@@ -24,20 +24,20 @@ def configure(config: LambdaWorkerConfig) -> None:
2424
2525
Configuration
2626
-------------
27-
Client connection settings (address, namespace, TLS, API key) are loaded
28-
automatically from a TOML config file and/or environment variables via
29-
:py:mod:`temporalio.envconfig`. The config file is resolved in order:
27+
Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML
28+
config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is
29+
resolved in order:
3030
3131
1. ``TEMPORAL_CONFIG_FILE`` env var, if set.
3232
2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``).
3333
3. ``temporal.toml`` in the current working directory.
3434
3535
The file is optional -- if absent, only environment variables are used.
3636
37-
The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with
38-
fields pre-populated with Lambda-appropriate defaults. Override any field
39-
directly in the callback. The ``task_queue`` key in ``worker_config`` is
40-
pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if set.
37+
The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated
38+
with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue``
39+
key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
40+
set.
4141
"""
4242

4343
from temporalio.contrib.aws.lambda_worker._configure import (

temporalio/contrib/aws/lambda_worker/_configure.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
class LambdaClientConnectConfig(ClientConnectConfig, total=False):
1818
"""Keyword arguments for :py:meth:`temporalio.client.Client.connect`.
1919
20-
Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with
21-
additional keys that may be set by the Lambda worker or its OTel helpers.
20+
Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with additional keys that may be
21+
set by the Lambda worker or its OTel helpers.
2222
"""
2323

2424
identity: str
@@ -31,13 +31,12 @@ class LambdaClientConnectConfig(ClientConnectConfig, total=False):
3131
class LambdaWorkerConfig:
3232
"""Passed to the configure callback of :py:func:`run_worker`.
3333
34-
Fields are pre-populated with Lambda-appropriate defaults before the
35-
configure callback is invoked; the callback may read and override any of
36-
them.
34+
Fields are pre-populated with Lambda-appropriate defaults before the configure callback is
35+
invoked; the callback may read and override any of them.
3736
38-
Use ``worker_config`` to set task queue, register workflows/activities, and
39-
tune worker options. The ``task_queue`` key is pre-populated from the
40-
``TEMPORAL_TASK_QUEUE`` environment variable if set.
37+
Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options.
38+
The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
39+
set.
4140
4241
Attributes:
4342
client_connect_config: Keyword arguments that will be passed to

temporalio/contrib/aws/lambda_worker/_defaults.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5
1919
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5)
2020
DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2)
21-
DEFAULT_MAX_CACHED_WORKFLOWS: int = 100
21+
DEFAULT_MAX_CACHED_WORKFLOWS: int = 30
2222

2323
DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2)
2424
DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1)
@@ -68,7 +68,7 @@ def build_lambda_identity(request_id: str, function_arn: str) -> str:
6868

6969
def lambda_default_config_file_path(
7070
getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment]
71-
) -> str:
71+
) -> Path:
7272
"""Return the config file path for a Lambda environment.
7373
7474
Resolution order:
@@ -79,6 +79,6 @@ def lambda_default_config_file_path(
7979
"""
8080
config_file = getenv(ENV_CONFIG_FILE)
8181
if config_file:
82-
return config_file
82+
return Path(config_file)
8383
root = getenv(ENV_LAMBDA_TASK_ROOT) or "."
84-
return str(Path(root) / DEFAULT_CONFIG_FILE)
84+
return Path(root) / DEFAULT_CONFIG_FILE

temporalio/contrib/aws/lambda_worker/_run_worker.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
"""Core run_worker implementation for Lambda."""
2-
31
from __future__ import annotations
42

53
import asyncio
@@ -67,23 +65,21 @@ def _default_extract_lambda_ctx(
6765
def run_worker(
6866
version: WorkerDeploymentVersion,
6967
configure: Callable[[LambdaWorkerConfig], None],
70-
) -> Callable[[Any, Any], Awaitable[None]]:
68+
) -> Callable[[Any, Any], None]:
7169
"""Create a Temporal worker Lambda handler.
7270
73-
Calls the *configure* callback to collect workflow/activity registrations
74-
and option overrides, then returns an async Lambda handler function. On
75-
each invocation the handler connects to the Temporal server, starts a
76-
worker with Lambda-tuned defaults, polls for tasks until the invocation
71+
Calls the *configure* callback to collect workflow/activity registrations and option overrides,
72+
then returns a Lambda handler function. On each invocation the handler connects to the Temporal
73+
server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation
7774
deadline approaches, and then gracefully shuts down.
7875
79-
The *version* parameter identifies this worker's deployment version.
80-
``run_worker`` always enables Worker Deployment Versioning
81-
(``use_worker_versioning=True``). To provide a default versioning behavior
82-
for workflows that do not specify one at registration time, set
76+
The *version* parameter identifies this worker's deployment version. ``run_worker`` always
77+
enables Worker Deployment Versioning (``use_worker_versioning=True``). To provide a default
78+
versioning behavior for workflows that do not specify one at registration time, set
8379
``deployment_config`` in ``worker_config`` in the configure callback.
8480
85-
The returned handler has the signature ``async handler(event, context)``
86-
and should be set as your Lambda function's handler entry point.
81+
The returned handler has the signature ``handler(event, context)`` and should be set as your
82+
Lambda function's handler entry point.
8783
8884
Args:
8985
version: The worker deployment version. Required.
@@ -92,7 +88,7 @@ def run_worker(
9288
activities, and options on it.
9389
9490
Returns:
95-
An async Lambda handler function.
91+
A Lambda handler function.
9692
9793
Example::
9894
@@ -127,7 +123,7 @@ def _run_worker_internal(
127123
version: WorkerDeploymentVersion,
128124
configure: Callable[[LambdaWorkerConfig], None],
129125
deps: _WorkerDeps,
130-
) -> Callable[[Any, Any], Awaitable[None]]:
126+
) -> Callable[[Any, Any], None]:
131127
"""Core logic with injected dependencies for testability."""
132128
if not version.deployment_name or not version.build_id:
133129
raise ValueError(
@@ -180,12 +176,14 @@ def _run_worker_internal(
180176

181177
extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx
182178

183-
async def _handler(_event: Any, lambda_context: Any) -> None:
184-
await _invocation_handler(
185-
lambda_context=lambda_context,
186-
config=config,
187-
deps=deps,
188-
extract_lambda_ctx=extract_lambda_ctx,
179+
def _handler(_event: Any, lambda_context: Any) -> None:
180+
asyncio.run(
181+
_invocation_handler(
182+
lambda_context=lambda_context,
183+
config=config,
184+
deps=deps,
185+
extract_lambda_ctx=extract_lambda_ctx,
186+
)
189187
)
190188

191189
return _handler
@@ -210,9 +208,10 @@ async def _invocation_handler(
210208
work_time = remaining - shutdown_buffer
211209
if work_time <= timedelta(seconds=1):
212210
raise RuntimeError(
213-
f"Lambda timeout leaves almost no time for work "
214-
f"(work_time={work_time}, shutdown_buffer={shutdown_buffer}); "
215-
f"increase the function timeout or decrease the shutdown "
211+
f"Lambda timeout is too short: {remaining.total_seconds():.1f}s "
212+
f"remaining but {shutdown_buffer.total_seconds():.1f}s is "
213+
f"reserved for shutdown, leaving no time for work. "
214+
f"Increase the function timeout or decrease the shutdown "
216215
f"deadline buffer"
217216
)
218217
elif work_time < timedelta(seconds=5):

temporalio/contrib/aws/lambda_worker/otel.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
"""OpenTelemetry helpers for Temporal workers running inside AWS Lambda.
22
3-
Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker`
4-
configure callback for a batteries-included setup that creates an OTel
5-
collector exporter and tracing interceptor, suitable for use with the AWS
6-
Distro for OpenTelemetry (ADOT) Lambda layer.
3+
Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker` configure callback for a
4+
batteries-included setup that creates an OTel collector exporter and tracing interceptor, suitable
5+
for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer.
76
8-
Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config`
9-
individually if you only need one.
7+
Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only
8+
need one.
109
"""
1110

1211
from __future__ import annotations
@@ -73,23 +72,19 @@ def apply_defaults(
7372
) -> None:
7473
"""Configure OTel metrics and tracing with AWS Lambda defaults.
7574
76-
Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime`
77-
with an :py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the
78-
OTLP collector, and adds a
79-
:py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for
80-
distributed tracing.
75+
Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime` with an
76+
:py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a
77+
:py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing.
8178
82-
The collector endpoint defaults to ``http://localhost:4317``, which is the
83-
endpoint expected by the ADOT collector Lambda layer.
79+
The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by
80+
the ADOT collector Lambda layer.
8481
85-
Registers a per-invocation ``ForceFlush`` shutdown hook for the
86-
``TracerProvider`` so pending traces are exported before each Lambda
87-
invocation completes.
82+
Registers a per-invocation ``ForceFlush`` shutdown hook for the ``TracerProvider`` so pending
83+
traces are exported before each Lambda invocation completes.
8884
89-
Core SDK metrics are exported on the ``metric_periodicity`` interval by
90-
the runtime's internal thread. There is no explicit flush API for Core
91-
metrics; set ``metric_periodicity`` short enough to ensure at least one
92-
export per invocation.
85+
Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread.
86+
There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to
87+
ensure at least one export per invocation.
9388
9489
Args:
9590
config: The :py:class:`LambdaWorkerConfig` to configure.
@@ -146,16 +141,14 @@ def build_metrics_telemetry_config(
146141
) -> TelemetryConfig:
147142
"""Build a :py:class:`~temporalio.runtime.TelemetryConfig` for OTel metrics.
148143
149-
Returns a ``TelemetryConfig`` with
150-
:py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics pointed at the
151-
given OTLP collector endpoint. Use this when you need to compose metrics
152-
config with other telemetry settings (e.g. custom logging) into your own
144+
Returns a ``TelemetryConfig`` with :py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics
145+
pointed at the given OTLP collector endpoint. Use this when you need to compose metrics config
146+
with other telemetry settings (e.g. custom logging) into your own
153147
:py:class:`~temporalio.runtime.Runtime`.
154148
155-
Core SDK metrics are exported on the ``metric_periodicity`` interval by
156-
the runtime's internal thread. There is no explicit flush API; set
157-
``metric_periodicity`` short enough to ensure at least one export per
158-
Lambda invocation.
149+
Core SDK metrics are exported on the ``metric_periodicity`` interval by the runtime's internal
150+
thread. There is no explicit flush API; set ``metric_periodicity`` short enough to ensure at
151+
least one export per Lambda invocation.
159152
160153
Example::
161154
@@ -205,8 +198,8 @@ def apply_tracing(
205198
"""Configure only OTel tracing (no metrics) on the Lambda worker config.
206199
207200
Adds a :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` to
208-
``config.client_connect_config["interceptors"]`` and registers a
209-
``ForceFlush`` shutdown hook for the provider.
201+
``config.client_connect_config["interceptors"]`` and registers a ``ForceFlush`` shutdown hook
202+
for the provider.
210203
211204
Args:
212205
config: The :py:class:`LambdaWorkerConfig` to configure.

0 commit comments

Comments
 (0)