Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions sentry_streams/sentry_streams/pipeline/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import copy
import importlib.resources
import json
import os
import re
from typing import Any, cast
from typing import Any, Optional, cast

import jsonschema
import yaml
Expand All @@ -13,6 +14,70 @@
_ENVVAR_PATTERN = re.compile(r"^\$\{envvar:([A-Za-z_][A-Za-z0-9_]*)\}$")


class TypeMismatchError(TypeError):
"""Raised when attempting to merge incompatible types in deepmerge."""

pass


class ScalarOverwriteError(ValueError):
"""Raised when attempting to overwrite a scalar value during deepmerge."""

pass


def deepmerge(
base: dict[str, Any],
override: dict[str, Any],
fail_on_scalar_overwrite: bool = False,
_path: list[str] | None = None,
) -> dict[str, Any]:
"""
Deep merge two dictionaries.

Merge semantics:
- Simple types (str, int, bool, None): override replaces base
- Dictionaries: recursively merge
- Lists: concatenate (append override elements to base)
- Type mismatches (e.g., dict + list, dict + str): raises TypeMismatchError
"""
if _path is None:
_path = []

result = copy.deepcopy(base)

for key, override_value in override.items():
current_path = _path + [key]
path_str = ".".join(current_path)

if key not in result:
result[key] = copy.deepcopy(override_value)
else:
base_value = result[key]

if isinstance(base_value, dict) and isinstance(override_value, dict):
result[key] = deepmerge(
base_value,
override_value,
fail_on_scalar_overwrite=fail_on_scalar_overwrite,
_path=current_path,
)
elif isinstance(base_value, list) and isinstance(override_value, list):
result[key] = base_value + copy.deepcopy(override_value)
elif type(base_value) is not type(override_value):
raise TypeMismatchError(
f"Cannot merge key '{key}': base type is {type(base_value)} but override type is {type(override_value)}"
)
else:
if fail_on_scalar_overwrite and base_value != override_value:
raise ScalarOverwriteError(
f"Cannot overwrite scalar at '{path_str}': would change {base_value!r} to {override_value!r}"
)
result[key] = copy.deepcopy(override_value)

return result


class ConfigEnvError(Exception):
"""Raised when a referenced environment variable is not set."""

Expand Down Expand Up @@ -60,13 +125,21 @@ def _resolve_string(value: str) -> str | None:
return os.environ[var_name]


def load_config(config_path: str) -> PipelineConfig:
def load_config(config_path: str, override_path: Optional[str] = None) -> PipelineConfig:
"""
Load a pipeline config file: read YAML, resolve ${envvar:...}, validate against schema.

If override_path is provided, load the override YAML and deep-merge it into the base
config before schema validation.
"""
with open(config_path, "r") as f:
config = yaml.safe_load(f)

if override_path is not None:
with open(override_path, "r") as f:
override = yaml.safe_load(f)
config = deepmerge(config, override)

resolve_envvars(config)

schema_path = importlib.resources.files("sentry_streams") / "config.json"
Expand Down
47 changes: 37 additions & 10 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import logging
import multiprocessing
import sys
from pathlib import Path
from typing import Any, Mapping, Optional, cast

import click
Expand Down Expand Up @@ -137,12 +137,13 @@ def load_runtime_with_config_file(
name: str,
log_level: str,
adapter: str,
config: str,
config_file: str,
segment_id: Optional[str],
application: str,
override_config: Optional[str] = None,
) -> Any:
"""Load runtime from a config file path, returning the runtime object without calling run()."""
environment_config = load_config(config)
environment_config = load_config(config_file, override_config)

sentry_sdk_config = environment_config.get("sentry_sdk_config")
if sentry_sdk_config:
Expand All @@ -155,9 +156,10 @@ def run_with_config_file(
name: str,
log_level: str,
adapter: str,
config: str,
config_file: str,
segment_id: Optional[str],
application: str,
override_config: Optional[str] = None,
) -> None:
"""
Load runtime from config file and run it. Used by the Python CLI.
Expand All @@ -169,7 +171,7 @@ def run_with_config_file(
control when .run() is called
"""
runtime = load_runtime_with_config_file(
name, log_level, adapter, config, segment_id, application
name, log_level, adapter, config_file, segment_id, application, override_config
)
runtime.run()

Expand Down Expand Up @@ -205,12 +207,23 @@ def run_with_config_file(
),
)
@click.option(
"--config",
required=True,
"--config-path",
default=None,
help=(
"The deployment config file path. Each config file currently corresponds to a specific pipeline."
"Directory containing config files. The config file is resolved as "
"<config-path>/<application-stem>.yaml. Mutually exclusive with --config-file."
),
)
@click.option(
"--config-file",
default=None,
help=("Direct path to the deployment config file. " "Mutually exclusive with --config-path."),
)
@click.option(
"--override-config",
default=None,
help="Path to an override YAML file to deep-merge on top of the base config.",
)
@click.option(
"--segment-id",
"-s",
Expand All @@ -225,11 +238,25 @@ def main(
name: str,
log_level: str,
adapter: str,
config: str,
config_path: Optional[str],
config_file: Optional[str],
override_config: Optional[str],
segment_id: Optional[str],
application: str,
) -> None:
run_with_config_file(name, log_level, adapter, config, segment_id, application)
if config_path is None and config_file is None:
raise click.UsageError("One of --config-path or --config-file must be provided.")
if config_path is not None and config_file is not None:
raise click.UsageError("--config-path and --config-file are mutually exclusive.")

if config_path is not None:
resolved_config_file = str(Path(config_path) / (Path(application).stem + ".yaml"))
else:
resolved_config_file = config_file # type: ignore[assignment]

run_with_config_file(
name, log_level, adapter, resolved_config_file, segment_id, application, override_config
)


if __name__ == "__main__":
Expand Down
7 changes: 7 additions & 0 deletions sentry_streams/tests/fixtures/override_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
metrics:
type: dummy
pipeline:
segments:
- steps_config:
myinput:
bootstrap_servers: ["override-broker:9092"]
58 changes: 57 additions & 1 deletion sentry_streams/tests/pipeline/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

import pytest

from sentry_streams.pipeline.config import ConfigEnvError, load_config, resolve_envvars
from sentry_streams.pipeline.config import (
ConfigEnvError,
TypeMismatchError,
load_config,
resolve_envvars,
)

FIXTURES_DIR = Path(__file__).parent.parent / "fixtures"

Expand Down Expand Up @@ -105,3 +110,54 @@ def test_load_config_returns_pipeline_config() -> None:
"127.0.0.1:9092",
]
assert config.get("metrics", {}).get("type") == "dummy"


def test_load_config_with_override_merges_and_validates() -> None:
"""load_config with override_path deep-merges the override and passes schema validation."""
config_file = FIXTURES_DIR / "config_with_envvar.yaml"
override_file = FIXTURES_DIR / "override_config.yaml"
with mock.patch.dict(os.environ, {"BOOTSTRAP_SERVERS": "127.0.0.1:9092"}, clear=True):
config = load_config(str(config_file), override_path=str(override_file))
assert "pipeline" in config
# The override appends its segment to the base segment list (list concatenation)
segments = config["pipeline"]["segments"]
assert len(segments) == 2
# Second segment comes from the override
assert segments[1]["steps_config"]["myinput"]["bootstrap_servers"] == ["override-broker:9092"]
assert config.get("metrics", {}).get("type") == "dummy"


def test_load_config_without_override_no_regression() -> None:
"""load_config without override_path behaves exactly as before."""
config_file = FIXTURES_DIR / "config_with_envvar.yaml"
with mock.patch.dict(os.environ, {"BOOTSTRAP_SERVERS": "10.0.0.1:9092"}, clear=True):
config = load_config(str(config_file))
segments = config["pipeline"]["segments"]
assert len(segments) == 1
assert segments[0]["steps_config"]["myinput"]["bootstrap_servers"] == ["10.0.0.1:9092"]


def test_load_config_override_type_mismatch_raises() -> None:
"""load_config raises TypeMismatchError when override has incompatible type."""
import tempfile

import yaml

base_content = {
"metrics": {"type": "dummy"},
"pipeline": {"segments": []},
"env": {"key": {"nested": "value"}},
}
override_content = {"env": {"key": "string_not_dict"}}

with (
tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as base_f,
tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as override_f,
):
yaml.dump(base_content, base_f)
yaml.dump(override_content, override_f)
base_path = base_f.name
override_path = override_f.name

with pytest.raises(TypeMismatchError):
load_config(base_path, override_path=override_path)
Loading