Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions temporalio/nexus/_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ def _to_client_workflow_handle(
)
return client.get_workflow_handle(self.workflow_id, result_type=result_type)

# TODO(nexus-preview): The return type here should be dictated by the input workflow
# handle type.
@classmethod
def _unsafe_from_client_workflow_handle(
cls, workflow_handle: temporalio.client.WorkflowHandle[Any, OutputT]
Expand Down
4 changes: 0 additions & 4 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def raise_from_exception_queue() -> NoReturn:
)
self._running_activities[task.task_token] = activity
elif task.HasField("cancel"):
# TODO(nexus-prerelease): does the task get removed from running_activities?
self._handle_cancel_activity_task(task.task_token, task.cancel)
else:
raise RuntimeError(f"Unrecognized activity task: {task}")
Expand Down Expand Up @@ -190,9 +189,6 @@ async def drain_poll_queue(self) -> None:

# Only call this after run()/drain_poll_queue() have returned. This will not
# raise an exception.
# TODO(nexus-preview): based on the comment above it looks like the intention may have been to use
# return_exceptions=True. Change this for nexus and activity and change call sites to consume entire
# stream and then raise first exception
async def wait_all_completed(self) -> None:
running_tasks = [v.task for v in self._running_activities.values() if v.task]
if running_tasks:
Expand Down
4 changes: 0 additions & 4 deletions temporalio/worker/_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ def __init__(
metric_meter: temporalio.common.MetricMeter,
executor: concurrent.futures.ThreadPoolExecutor | None,
) -> None:
# TODO: make it possible to query task queue of bridge worker instead of passing
# unused task_queue into _NexusWorker, _ActivityWorker, etc?
self._bridge_worker = bridge_worker
self._client = client
self._task_queue = task_queue
Expand All @@ -91,8 +89,6 @@ def __init__(
)

self._data_converter = data_converter
# TODO(nexus-preview): interceptors
self._interceptors = interceptors

self._running_tasks: dict[bytes, _RunningNexusTask] = {}
self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue()
Expand Down
3 changes: 0 additions & 3 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3246,9 +3246,6 @@ async def cancel(self) -> None:
await self._instance._cancel_external_workflow(command)


# TODO(nexus-preview): are we sure we don't want to inherit from asyncio.Task as
# ActivityHandle and ChildWorkflowHandle do? I worry that we should provide .done(),
# .result(), .exception() etc for consistency.
class _NexusOperationHandle(temporalio.workflow.NexusOperationHandle[OutputT]):
def __init__(
self,
Expand Down
3 changes: 0 additions & 3 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5476,9 +5476,6 @@ async def execute_operation(
summary: str | None = None,
) -> OutputT: ...

# TODO(nexus-preview): in practice, both these overloads match an async def sync
# operation (i.e. either can be deleted without causing a type error).

# Overload for sync_operation methods (async def)
@overload
@abstractmethod
Expand Down
4 changes: 0 additions & 4 deletions tests/nexus/test_workflow_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from tests.helpers.metrics import PromMetricMatcher
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name

# TODO(nexus-prerelease): test availability of Temporal client etc in async context set by worker
# TODO(nexus-preview): test worker shutdown, wait_all_completed, drain etc

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -549,8 +548,6 @@ async def test_sync_response(
task_queue=task_queue,
)

# TODO(nexus-prerelease): check bidi links for sync operation

# The operation result is returned even when request_cancel=True, because the
# response was synchronous and it could not be cancelled. See explanation below.
if exception_in_operation_start:
Expand Down Expand Up @@ -628,7 +625,6 @@ async def test_async_response(
)
return

# TODO(nexus-prerelease): race here? How do we know it hasn't been canceled already?
handler_wf_info = await handler_wf_handle.describe()
assert handler_wf_info.status in [
WorkflowExecutionStatus.RUNNING,
Expand Down
Loading