Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sagemaker-mlops/src/sagemaker/mlops/local/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sagemaker.core.workflow.functions import Join, JsonGet, PropertyFile
from sagemaker.core.workflow.properties import Properties
from sagemaker.core.workflow.execution_variables import ExecutionVariable, ExecutionVariables

# Orchestration imports (now in mlops)
from sagemaker.mlops.workflow.function_step import DelayedReturn
from sagemaker.mlops.workflow.steps import StepTypeEnum, Step
Expand Down Expand Up @@ -560,4 +561,4 @@ def get(self, step: Step) -> _StepExecutor:
self.pipeline_executor.execution.update_step_failure(
step.name, f"Unsupported step type {step_type} to execute."
)
return step_executor
return step_executor
75 changes: 65 additions & 10 deletions sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@
from sagemaker.core.remote_function.job import JOBS_CONTAINER_ENTRYPOINT
from sagemaker.core.s3 import s3_path_join
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.common_utils import resolve_value_from_config, retry_with_backoff, format_tags, Tags
from sagemaker.core.common_utils import (
resolve_value_from_config,
retry_with_backoff,
format_tags,
Tags,
)

# Orchestration imports (now in mlops)
from sagemaker.mlops.workflow.callback_step import CallbackOutput, CallbackStep
from sagemaker.mlops.workflow._event_bridge_client_helper import (
Expand All @@ -44,19 +50,24 @@
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT,
)
from sagemaker.mlops.workflow.lambda_step import LambdaOutput, LambdaStep
from sagemaker.core.shapes.shapes import MlflowConfig
from sagemaker.core.helper.pipeline_variable import (
RequestType,
PipelineVariable,
)

# Primitive imports (stay in core)
from sagemaker.core.workflow.execution_variables import ExecutionVariables
from sagemaker.core.workflow.parameters import Parameter

# Orchestration imports (now in mlops)
from sagemaker.core.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.mlops.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.mlops.workflow.parallelism_config import ParallelismConfiguration

# Primitive imports (stay in core)
from sagemaker.core.workflow.properties import Properties

# Orchestration imports (now in mlops)
from sagemaker.mlops.workflow.selective_execution_config import SelectiveExecutionConfig
from sagemaker.core.workflow.step_outputs import StepOutput
Expand Down Expand Up @@ -89,6 +100,7 @@ def __init__(
name: str = "",
parameters: Optional[Sequence[Parameter]] = None,
pipeline_experiment_config: Optional[PipelineExperimentConfig] = _DEFAULT_EXPERIMENT_CFG,
mlflow_config: Optional[MlflowConfig] = None,
steps: Optional[Sequence[Union[Step, StepOutput]]] = None,
sagemaker_session: Optional[Session] = None,
pipeline_definition_config: Optional[PipelineDefinitionConfig] = _DEFAULT_DEFINITION_CFG,
Expand All @@ -104,6 +116,8 @@ def __init__(
the same name already exists. By default, pipeline name is used as
experiment name and execution id is used as the trial name.
If set to None, no experiment or trial will be created automatically.
mlflow_config (Optional[MlflowConfig]): If set, the pipeline will be configured
with MLflow tracking for experiment tracking and model versioning.
steps (Sequence[Union[Step, StepOutput]]): The list of the
non-conditional steps associated with the pipeline. Any steps that are within the
`if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
Expand All @@ -120,6 +134,7 @@ def __init__(
self.name = name
self.parameters = parameters if parameters else []
self.pipeline_experiment_config = pipeline_experiment_config
self.mlflow_config = mlflow_config
self.steps = steps if steps else []
self.sagemaker_session = sagemaker_session if sagemaker_session else Session()
self.pipeline_definition_config = pipeline_definition_config
Expand Down Expand Up @@ -359,6 +374,7 @@ def start(
execution_description: str = None,
parallelism_config: ParallelismConfiguration = None,
selective_execution_config: SelectiveExecutionConfig = None,
mlflow_experiment_name: str = None,
pipeline_version_id: int = None,
):
"""Starts a Pipeline execution in the Workflow service.
Expand All @@ -373,6 +389,10 @@ def start(
over the parallelism configuration of the parent pipeline.
selective_execution_config (Optional[SelectiveExecutionConfig]): The configuration for
selective step execution.
mlflow_experiment_name (str): Optional MLflow experiment name to override
the experiment name specified in the pipeline's mlflow_config.
If provided, this will override the experiment name for this specific
pipeline execution only, without modifying the pipeline definition.
pipeline_version_id (Optional[str]): version ID of the pipeline to start the execution from. If not
specified, uses the latest version ID.

Expand All @@ -396,6 +416,7 @@ def start(
PipelineExecutionDisplayName=execution_display_name,
ParallelismConfiguration=parallelism_config,
SelectiveExecutionConfig=selective_execution_config,
MlflowExperimentName=mlflow_experiment_name,
PipelineVersionId=pipeline_version_id,
)
if self.sagemaker_session.local_mode:
Expand Down Expand Up @@ -435,14 +456,23 @@ def definition(self) -> str:
if self.pipeline_experiment_config is not None
else None
),
"MlflowConfig": _convert_mlflow_config_to_request(self.mlflow_config),
"Steps": list_to_request(compiled_steps),
}

request_dict["PipelineExperimentConfig"] = interpolate(
request_dict["PipelineExperimentConfig"], {}, {}, pipeline_name=self.name
)
callback_output_to_step_map = _map_callback_outputs(self.steps)
lambda_output_to_step_name = _map_lambda_outputs(self.steps)
request_dict["PipelineExperimentConfig"] = interpolate(
request_dict["PipelineExperimentConfig"],
callback_output_to_step_map=callback_output_to_step_map,
lambda_output_to_step_map=lambda_output_to_step_name,
pipeline_name=self.name,
)
request_dict["MlflowConfig"] = interpolate(
request_dict["MlflowConfig"],
callback_output_to_step_map=callback_output_to_step_map,
lambda_output_to_step_map=lambda_output_to_step_name,
pipeline_name=self.name,
)
request_dict["Steps"] = interpolate(
request_dict["Steps"],
callback_output_to_step_map=callback_output_to_step_map,
Expand Down Expand Up @@ -730,6 +760,34 @@ def delete_triggers(self, trigger_names: List[str]):
logger.info("Deleted Pipeline Schedule: %s ...", trigger_name)


def _convert_mlflow_config_to_request(mlflow_config: MlflowConfig) -> dict:
"""Convert sagemaker-core MlflowConfig to pipeline request format.

Args:
mlflow_config: MlflowConfig instance from sagemaker.core.shapes.shapes

Returns:
dict: Request format for pipeline MLflow configuration
"""
if mlflow_config is None:
return None

from sagemaker.core.utils.utils import Unassigned

resource_arn = mlflow_config.mlflow_resource_arn
if isinstance(resource_arn, Unassigned):
resource_arn = None

experiment_name = mlflow_config.mlflow_experiment_name
if isinstance(experiment_name, Unassigned):
experiment_name = None

return {
"MlflowResourceArn": resource_arn,
"MlflowExperimentName": experiment_name,
}


def format_start_parameters(parameters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Formats start parameter overrides as a list of dicts.

Expand Down Expand Up @@ -1135,7 +1193,6 @@ def _initialize_adjacency_list(self) -> Dict[str, List[str]]:
if isinstance(child_step, Step):
dependency_list[child_step.name].add(step.name)


adjacency_list = {}
for step in dependency_list:
for step_dependency in dependency_list[step]:
Expand Down Expand Up @@ -1173,9 +1230,7 @@ def is_cyclic_helper(current_step):
return True
return False

def get_steps_in_sub_dag(
self, current_step: Step, sub_dag_steps: Set[str] = None
) -> Set[str]:
def get_steps_in_sub_dag(self, current_step: Step, sub_dag_steps: Set[str] = None) -> Set[str]:
"""Get names of all steps (including current step) in the sub dag of current step.

Returns a set of step names in the sub dag.
Expand Down Expand Up @@ -1215,4 +1270,4 @@ def __next__(self) -> Step:

while self.stack:
return self.step_map.get(self.stack.pop())
raise StopIteration
raise StopIteration
Loading
Loading