Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def build_container(
process_count: int | None = None,
enable_liveness_probe: bool = True,
multiprocess_enabled: bool | None = None,
container_name: str = "pipeline-consumer",
) -> dict[str, Any]:
"""
Build a complete container specification for the pipeline step.
Expand Down Expand Up @@ -144,7 +145,7 @@ def build_container(
)

pipeline_additions: dict[str, Any] = {
"name": "pipeline-consumer",
"name": container_name,
"image": image_name,
"command": ["python", "-m", "sentry_streams.runner"],
"args": [
Expand Down Expand Up @@ -230,6 +231,7 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext:
"replicas": context.get("replicas", 1),
"emergency_patch": emergency_patch_parsed,
"enable_liveness_probe": context.get("enable_liveness_probe", True),
"container_name": context.get("container_name", "pipeline-consumer"),
}


Expand All @@ -250,6 +252,7 @@ class PipelineStepContext(TypedDict):
replicas: int
emergency_patch: NotRequired[dict[str, Any]]
enable_liveness_probe: NotRequired[bool]
container_name: NotRequired[str]


class PipelineStep(ExternalMacro):
Expand Down Expand Up @@ -366,6 +369,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
replicas = ctx["replicas"]
emergency_patch = ctx.get("emergency_patch", {})
enable_liveness_probe = ctx.get("enable_liveness_probe", True)
container_name = ctx.get("container_name", "pipeline-consumer")

process_count, segments_with_parallelism = get_multiprocess_config(pipeline_config)
if len(segments_with_parallelism) > 1:
Expand All @@ -389,6 +393,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
process_count,
enable_liveness_probe,
multiprocess_enabled,
container_name,
)

base_deployment = load_base_template("deployment")
Expand Down
54 changes: 54 additions & 0 deletions sentry_streams_k8s/tests/test_pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ def test_build_container() -> None:
}


def test_build_container_custom_name() -> None:
"""Test that build_container uses the provided container_name."""
container = build_container(
container_template={},
pipeline_name="profiles",
pipeline_module="sbc.profiles",
image_name="my-image:latest",
cpu_per_process=1000,
memory_per_process=512,
segment_id=0,
container_name="my-custom-container",
)

assert container["name"] == "my-custom-container"


def test_build_container_with_log_level() -> None:
"""Test that build_container propagates a custom log level."""
container = build_container(
Expand Down Expand Up @@ -1070,3 +1086,41 @@ def test_emergency_patch_overrides_final_deployment() -> None:
assert deployment["metadata"]["labels"]["pipeline-app"] == "sbc-profiles"
assert len(deployment["spec"]["template"]["spec"]["containers"]) == 1
assert deployment["spec"]["template"]["spec"]["containers"][0]["name"] == "pipeline-consumer"


def test_run_custom_container_name() -> None:
"""Test that run() uses the provided container_name instead of the default."""
context: dict[str, Any] = {
"service_name": "my-service",
"pipeline_name": "profiles",
"deployment_template": {},
"container_template": {},
"pipeline_config": {
"env": {},
"pipeline": {
"segments": [
{
"steps_config": {
"myinput": {
"starts_segment": True,
"bootstrap_servers": ["127.0.0.1:9092"],
}
}
}
]
},
},
"pipeline_module": "sbc.profiles",
"image_name": "my-image:latest",
"cpu_per_process": 1000,
"memory_per_process": 512,
"segment_id": 0,
"replicas": 1,
"container_name": "my-custom-container",
}

pipeline_step = PipelineStep()
result = pipeline_step.run(context)
containers = result["deployment"]["spec"]["template"]["spec"]["containers"]
assert len(containers) == 1
assert containers[0]["name"] == "my-custom-container"
Loading