Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions jetstream/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
HIGH_DATA_THRESHOLD = 20
MODERATE_DATA_THRESHOLD = 10

# Nodes in our current cluster have 61.47GB allocatable, per GCP console.
# The node's reported memory requested seems to be 0.89GB higher than what we request,
# so this default represents greatest whole number doesn't leave a lot of wasted memory.
DEFAULT_MEMORY_REQUEST_GB = 19

# Date when discrete metrics was switched to default
DISCRETE_AS_DEFAULT_THRESHOLD = datetime(2026, 3, 31, tzinfo=pytz.utc)

Expand Down Expand Up @@ -102,6 +107,9 @@ class ArgoExecutorStrategy:
zone: str
cluster_id: str
monitor_status: bool
memory_request: (
str # format rules: https://argo-workflows.readthedocs.io/en/latest/fields/#quantity
)
bucket: str | None = None
cluster_ip: str | None = None
cluster_cert: str | None = None
Expand Down Expand Up @@ -220,6 +228,7 @@ def execute(
else analysis_period_default.value
),
"image": self.image,
"memory_request": self.memory_request,
"statistics_only": self.statistics_only,
},
monitor_status=self.monitor_status,
Expand Down Expand Up @@ -663,6 +672,19 @@ def convert(self, value, param, ctx):
return value


class ClickGBString(click.ParamType):
# takes an int and converts it to a memory request string for Argo like "15G"
name = "gb_str_from_int"

def convert(self, value, param, ctx) -> str:
# if value already ends with G, assume the rest is an int
int_value = int(value[0:-1]) if value[-1] == "G" else int(value)
if int_value >= 1 and int_value <= 60:
return f"{int_value}G"

self.fail(f"{value} must be an integer between 1 and 60 (inclusive)", param, ctx)


def project_id_option(default="moz-fx-data-experiments"):
return click.option(
"--project_id",
Expand Down Expand Up @@ -838,6 +860,15 @@ def analysis_periods_option(
multiple=True,
)

memory_request_option = click.option(
"--memory-request",
"--memory_request",
help="Memory request for Argo pod (in GB)",
type=ClickGBString(),
required=False,
default=DEFAULT_MEMORY_REQUEST_GB,
)


@cli.command()
@project_id_option()
Expand Down Expand Up @@ -942,6 +973,7 @@ def run(
@statistics_only_option
@discrete_metrics_option
@metric_slugs_option
@memory_request_option
@analysis_periods_option()
def run_argo(
project_id,
Expand All @@ -963,6 +995,7 @@ def run_argo(
statistics_only,
discrete_metrics,
metric_slug,
memory_request,
):
"""Runs analysis for the provided date using Argo."""
strategy = ArgoExecutorStrategy(
Expand All @@ -980,6 +1013,7 @@ def run_argo(
statistics_only=statistics_only,
discrete_metrics=discrete_metrics,
metric_slugs=list(metric_slug) if metric_slug else None,
memory_request=memory_request,
)

AnalysisExecutor(
Expand Down Expand Up @@ -1019,6 +1053,7 @@ def run_argo(
@statistics_only_option
@discrete_metrics_option
@metric_slugs_option
@memory_request_option
@click.pass_context
def rerun(
ctx,
Expand All @@ -1043,6 +1078,7 @@ def rerun(
statistics_only,
discrete_metrics,
metric_slug,
memory_request,
):
"""Rerun all available analyses for a specific experiment."""
if len(experiment_slug) > 1 and config_file:
Expand Down Expand Up @@ -1087,6 +1123,7 @@ def rerun(
statistics_only=statistics_only,
discrete_metrics=discrete_metrics,
metric_slugs=list(metric_slug) if metric_slug else None,
memory_request=memory_request,
)

success = AnalysisExecutor(
Expand Down
4 changes: 4 additions & 0 deletions jetstream/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ def test_simple_workflow(
AnalysisPeriod.PREENROLLMENT_DAYS_28,
],
discrete_metrics=discrete_metrics,
memory_request="1G",
)
run_date = dt.datetime(2020, 10, 31, tzinfo=UTC)
strategy.execute([(config, run_date)])
Expand Down Expand Up @@ -730,6 +731,7 @@ def test_simple_workflow(
"analysis_periods_preenrollment_week": "preenrollment_week",
"analysis_periods_preenrollment_days28": "preenrollment_days28",
"image": "jetstream",
"memory_request": "1G",
"statistics_only": False,
},
monitor_status=False,
Expand Down Expand Up @@ -781,6 +783,7 @@ def test_simple_workflow_custom_image(
],
image="unrelated",
image_version="latest",
memory_request="12G",
discrete_metrics=discrete_metrics,
)
run_date = dt.datetime(2020, 10, 31, tzinfo=UTC)
Expand Down Expand Up @@ -816,6 +819,7 @@ def test_simple_workflow_custom_image(
"analysis_periods_preenrollment_week": "preenrollment_week",
"analysis_periods_preenrollment_days28": "preenrollment_days28",
"image": "unrelated",
"memory_request": "12G",
"statistics_only": False,
},
monitor_status=False,
Expand Down
15 changes: 10 additions & 5 deletions jetstream/workflows/run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spec:
- name: analysis_periods_preenrollment_week
- name: analysis_periods_preenrollment_days28
- name: image
- name: memory_request
templates:
- name: jetstream
parallelism: 5 # run up to 5 containers in parallel at the same time
Expand Down Expand Up @@ -112,11 +113,15 @@ spec:
"--statistics-only={{workflow.parameters.statistics_only}}",
"{{inputs.parameters.discrete_metrics}}",
]
resources:
requests:
memory: 16Gi # make sure there is at least 16Gb of memory available for the task
limits:
cpu: 4 # limit to 4 cores
# podSpecPatch allows us to parameterize the resources
podSpecPatch: |
containers:
- name: main
resources:
requests:
memory: "{{workflow.parameters.memory_request}}"
limits:
cpu: 4
retryStrategy:
limit: 3 # execute a container max. 3x; sometimes a container run might fail due to limited resources
retryPolicy: "Always"
Expand Down
Loading