Skip to content

DynamicForkTask sets incompatible parameters causing validation error #377

@grafke

Description

@grafke

#349 issue is back up.

This PR #352 was undone by this commit: baadd20

A temporary workaround that I came up with is this:

"""
Conductor SDK extensions and workarounds.

This module is the canonical location for workarounds and extensions
to the conductor-python SDK. Keeping them separate from workflow
definitions avoids circular imports and makes the workarounds explicit.
"""

from copy import deepcopy

from conductor.client.workflow.task.join_task import JoinTask
from conductor.client.workflow.task.task import TaskInterface, WorkflowTask
from conductor.client.workflow.task.task_type import TaskType


class FixedDynamicForkTask(TaskInterface):
    """
    Fixed DynamicForkTask that correctly uses dynamicForkTasksParam instead of
    dynamicForkJoinTasksParam. The upstream SDK has a bug where it sets the wrong field.

    See: https://github.com/conductor-oss/python-sdk/pull/352
    """

    def __init__(
        self,
        task_ref_name: str,
        tasks_param: str = "dynamicTasks",
        tasks_input_param_name: str = "dynamicTasksInput",
        join_task: JoinTask = None,
    ):
        super().__init__(
            task_reference_name=task_ref_name, task_type=TaskType.FORK_JOIN_DYNAMIC
        )
        self.tasks_param = tasks_param
        self.tasks_input_param_name = tasks_input_param_name
        self._join_task = deepcopy(join_task)

    def to_workflow_task(self) -> list[WorkflowTask]:
        wf_task = super().to_workflow_task()
        # FIX: Use dynamic_fork_tasks_param (not dynamic_fork_join_tasks_param)
        wf_task.dynamic_fork_tasks_param = self.tasks_param
        wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
        tasks = [wf_task]
        if self._join_task is not None:
            tasks.append(self._join_task.to_workflow_task())
        return tasks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions