Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hyperglass/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,15 @@
"bird",
"openbgpd",
)

# Characters that must never appear in a query target. Used at two points in
# the request pipeline (once at the type/validation boundary in
# `models.api.query`, again at the device-transport boundary in
# `execution.drivers._construct`) so a regression in either layer cannot let
# a CLI/shell metacharacter reach a device.
FORBIDDEN_TARGET_CHARS = frozenset(
"\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f"
"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f"
"\x7f" # DEL
";|&`<>\"\\"
)
19 changes: 18 additions & 1 deletion hyperglass/execution/drivers/_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
# Project
from hyperglass.log import log
from hyperglass.util import get_fmt_keys
from hyperglass.constants import TRANSPORT_REST, TARGET_FORMAT_SPACE
from hyperglass.constants import (
TRANSPORT_REST,
TARGET_FORMAT_SPACE,
FORBIDDEN_TARGET_CHARS,
)
from hyperglass.exceptions.public import InputInvalid
from hyperglass.exceptions.private import ConfigError

Expand All @@ -29,6 +33,12 @@

FormatterCallback = t.Callable[[str], t.Union[t.List[str], str]]

# Final, post-formatter defense before a target is interpolated into a device
# command. The same `FORBIDDEN_TARGET_CHARS` set is checked at the type-level
# boundary in `models.api.query`; running it again here means a regression in
# either the QueryTarget constraint or a custom directive's regex still
# cannot reach `send_command`.


class Construct:
"""Construct SSH commands/REST API parameters from validated query data."""
Expand Down Expand Up @@ -110,6 +120,13 @@ def format(self, command: str) -> str:
except ValueError:
pass

target_str = str(self.target)
if any(c in FORBIDDEN_TARGET_CHARS for c in target_str):
raise InputInvalid(
error="Target contains disallowed character(s)",
target=target_str,
)

Comment thread
z23 marked this conversation as resolved.
return command.format(target=self.target, mask=mask, **attrs)

def queries(self):
Expand Down
21 changes: 21 additions & 0 deletions hyperglass/execution/drivers/tests/test_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from hyperglass.models.directive import Directives
from hyperglass.models.config.params import Params
from hyperglass.models.config.devices import Devices
from hyperglass.exceptions.public import InputInvalid

# Local
from .._construct import Construct
Expand Down Expand Up @@ -88,3 +89,23 @@ def test_construct(state):
)
constructor = Construct(device=state.devices["test1"], query=query)
assert constructor.target == "192.0.2.0/24"


@pytest.mark.parametrize("metachar", [";", "|", "\n", "`", '"', "<", ">", "\\"])
def test_construct_format_rejects_forbidden_post_formatter_target(state, metachar):
"""Layer-3 defense in depth must reject a metachar in `self.target`.

Even if Layers 1/2 are bypassed (e.g. by a plugin transform that injects
a metacharacter, or a `RuleWithoutValidation` directive),
`Construct.format()` must still refuse to build the command string.
Simulated by mutating `target` post-init to a forbidden value.
"""
query = Query(
queryLocation="test1",
queryTarget="192.0.2.0/24",
queryType="juniper_bgp_route",
)
constructor = Construct(device=state.devices["test1"], query=query)
constructor.target = f"192.0.2.0/24{metachar}id"
with pytest.raises(InputInvalid):
constructor.format("show route {target}")
28 changes: 27 additions & 1 deletion hyperglass/models/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from hyperglass.util import snake_to_camel, repr_from_attrs
from hyperglass.state import use_state
from hyperglass.plugins import InputPluginManager
from hyperglass.constants import FORBIDDEN_TARGET_CHARS
from hyperglass.exceptions.public import InputInvalid, QueryTypeNotFound, QueryLocationNotFound
from hyperglass.exceptions.private import InputValidationError

Expand All @@ -23,9 +24,26 @@


QueryLocation = Annotated[str, StringConstraints(strict=True, min_length=1, strip_whitespace=True)]
QueryTarget = Annotated[str, StringConstraints(min_length=1, strip_whitespace=True)]
QueryTarget = Annotated[
str,
StringConstraints(min_length=1, max_length=255, strip_whitespace=True),
]
QueryType = Annotated[str, StringConstraints(strict=True, min_length=1, strip_whitespace=True)]

def _check_query_target(value: str) -> str:
"""Reject targets containing control characters or shell/CLI metacharacters.

The forbidden set is the canonical `FORBIDDEN_TARGET_CHARS` from
`hyperglass.constants`; the same set is checked again at the device
transport boundary (`execution.drivers._construct`).
"""
if any(c in FORBIDDEN_TARGET_CHARS for c in value):
raise InputValidationError(
error="Target contains disallowed character(s)",
target=value,
)
return value


class SimpleQuery(BaseModel):
"""A simple representation of a post-validated query."""
Expand Down Expand Up @@ -106,6 +124,14 @@ def random(self) -> str:

def validate_query_target(self) -> None:
"""Validate a query target after all fields/relationships have been initialized."""
# Reject control characters and CLI/shell metacharacters before any
# rule-based validation. Defense in depth: even with a permissive
# directive regex, these characters must never reach a device CLI.
if isinstance(self.query_target, list):
for item in self.query_target:
_check_query_target(item)
else:
_check_query_target(self.query_target)
# Run config/rule-based validations.
self.directive.validate_target(self.query_target)
# Run plugin-based validations.
Expand Down
41 changes: 38 additions & 3 deletions hyperglass/models/directive.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,33 @@ class RuleWithPattern(Rule):
_type: RuleTypeAttr = "pattern"
condition: str

def validate_target(self, target: str, *, multiple: bool) -> str: # noqa: C901
# When `condition == "*"`, accept only characters that are legitimately part
# of BGP AS-path regex syntax (anchors, quantifiers, groups, character
# classes) or BGP community values. CLI metacharacters (`|`, `;`, `&`,
# `<`, `>`, backtick, quotes, whitespace control, etc.) are excluded so a
# user cannot break out of `command.format(target=...)` on the device.
# Regex alternation (`|`) is intentionally not allowed: most built-in
# vendor directives interpolate the target unquoted, where `|` would be
# parsed as a CLI pipe filter rather than regex alternation. Custom
# directives should set an explicit `condition` regex.
_WILDCARD_PATTERN = re.compile(
r"[A-Za-z0-9:_\-\^\$\.\*\+\?\(\)\[\] ]+"
)

def validate_target( # noqa: C901
self, target: t.Union[str, t.List[str]], *, multiple: bool
) -> bool:
"""Validate a string target against configured regex patterns."""

def validate_single_value(value: str) -> t.Union[bool, BaseException]:
if self.condition == "*":
pattern = re.compile(".+", re.IGNORECASE)
pattern = self._WILDCARD_PATTERN
else:
pattern = re.compile(self.condition, re.IGNORECASE)
is_match = pattern.match(value)
# `fullmatch` (not `match`) ensures the entire target conforms to
# the pattern. With `match`, a pattern like `[0-9]+` would accept
# `123;reboot` because the match anchors only at the start.
is_match = pattern.fullmatch(value)

if is_match and self.action == "permit":
return True
Expand Down Expand Up @@ -273,6 +291,23 @@ class Directive(HyperglassUniqueModel, unique_by=("id", "table_output")):
multiple: bool = False
multiple_separator: str = " "

@field_validator("multiple_separator")
@classmethod
def validate_multiple_separator(cls, value: str) -> str:
"""Restrict the multi-target join character to safe values.

The separator is interpolated unquoted between user-supplied targets
and then sent verbatim to the device CLI. Allowing arbitrary separators
would make multi-target queries an injection vector if an operator
misconfigured the value (e.g. `;` or `|`).
"""
allowed = {" ", ","}
if value not in allowed:
raise ValueError(
f"multiple_separator must be one of {sorted(allowed)!r}, got {value!r}"
)
return value

@field_validator("rules", mode="before")
@classmethod
def validate_rules(cls, rules: t.List[t.Dict[str, t.Any]]):
Expand Down
193 changes: 193 additions & 0 deletions hyperglass/models/tests/test_input_hardening.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""Regression tests for input-validation hardening.

These tests pin the security guarantees of the input pipeline:

1. CLI/shell metacharacters and control characters are rejected before any
target reaches `command.format(target=...)`.
2. The wildcard `condition: "*"` accepts only characters that legitimately
appear in BGP AS-path regex syntax or community values.
3. `RuleWithPattern` uses `fullmatch` semantics, so a custom regex without
end-of-string anchoring still cannot allow trailing payload.
4. `Directive.multiple_separator` is restricted to a small allow-list.
Comment thread
z23 marked this conversation as resolved.
"""

# Standard Library
import pytest

# Project
from hyperglass.exceptions.private import InputValidationError
from hyperglass.models.api.query import _check_query_target

# Local
from ..directive import Directive, RuleWithPattern


@pytest.mark.parametrize(
"target",
[
"1.1.1.1; reboot",
"1.1.1.1 | section bgp",
"1.1.1.1 && uname",
"1.1.1.1\nshow running-config",
"1.1.1.1\rshow running-config",
"foo`id`",
'foo" | section line "bar',
"foo\\bar",
"1.1.1.1 > /tmp/x",
"1.1.1.1 < /tmp/x",
],
)
def test_wildcard_pattern_rejects_metachars(target):
"""`condition: "*"` must not let CLI metacharacters through."""
rule = RuleWithPattern(condition="*", action="permit", commands=["show {target}"])
assert rule.validate_target(target, multiple=False) is False


@pytest.mark.parametrize(
"target",
[
"65000",
"65000_65001",
"_65000_",
"^65000$",
"65000 .* 65001",
"(65000)(65001)",
"65000:100",
"no-export",
],
)
def test_wildcard_pattern_accepts_legitimate_targets(target):
"""`condition: "*"` must continue to accept normal AS-path / community values."""
rule = RuleWithPattern(condition="*", action="permit", commands=["show {target}"])
assert rule.validate_target(target, multiple=False) is True


def test_pattern_rule_uses_fullmatch():
"""A non-anchored custom regex must not allow trailing payload (fullmatch, not match)."""
rule = RuleWithPattern(
condition=r"[0-9]+",
action="permit",
commands=["show ip bgp regexp {target}"],
)
# `match` would accept this because it anchors only at the start; `fullmatch` rejects.
assert rule.validate_target("12345abc", multiple=False) is False
assert rule.validate_target("12345", multiple=False) is True


def test_multiple_separator_allowlist_rejects_pipe():
"""`multiple_separator: "|"` would be a CLI-injection vector if accepted."""
with pytest.raises(ValueError):
Directive(
id="x",
name="x",
field=None,
multiple=True,
multiple_separator="|",
rules=[{"condition": "*", "action": "permit", "command": "show {target}"}],
)


def test_multiple_separator_allowlist_rejects_semicolon():
with pytest.raises(ValueError):
Directive(
id="x",
name="x",
field=None,
multiple=True,
multiple_separator=";",
rules=[{"condition": "*", "action": "permit", "command": "show {target}"}],
)


def test_multiple_separator_allowlist_accepts_space_and_comma():
for sep in (" ", ","):
d = Directive(
id="x",
name="x",
field=None,
multiple=True,
multiple_separator=sep,
rules=[{"condition": "*", "action": "permit", "command": "show {target}"}],
)
assert d.multiple_separator == sep


def test_pattern_rule_rejects_embedded_newline_with_strict_regex():
"""An IP-shaped regex must not match a target with an embedded newline."""
rule = RuleWithPattern(
condition=r"\d+\.\d+\.\d+\.\d+",
action="permit",
commands=["ping {target}"],
)
assert rule.validate_target("1.1.1.1\nshow run", multiple=False) is False


# Layer 1 (type-level) tests: `_check_query_target` is the first gate the
# request hits, before any directive rule runs. These tests pin the guarantee
# directly so the type-level check can't silently regress even if the
# directive-rule layer (Layer 2) is permissive.


@pytest.mark.parametrize(
"target",
[
"1.1.1.1;reboot",
"1.1.1.1|cmd",
"1.1.1.1&cmd",
"1.1.1.1\nshow run",
"1.1.1.1\rshow run",
"1.1.1.1\tshow run",
"1.1.1.1\x00",
"foo`id`",
'foo"bar',
"foo<bar",
"foo>bar",
"foo\\bar",
],
)
def test_check_query_target_rejects_forbidden_chars(target):
"""The type-level forbidden-character check must reject every metachar."""
with pytest.raises(InputValidationError):
_check_query_target(target)


@pytest.mark.parametrize(
"target",
["192.0.2.0/24", "65000:100", "_65000_", "^65000$", "(65000)(65001)"],
)
def test_check_query_target_accepts_normal_targets(target):
"""Normal looking-glass targets must pass the type-level check."""
assert _check_query_target(target) == target


def test_query_validates_each_list_target_elementwise():
"""List targets must be checked element-wise at the type-level.

Even when a directive's regex permits anything (`condition: "*"`), the
type-level forbidden-character check must still reject a list whose any
element contains a metacharacter.
"""
# Simulate the loop in `Query.validate_query_target`: each element runs
# through `_check_query_target` before the directive rule sees it.
targets = ["1.1.1.1", "2.2.2.2;reboot"]
with pytest.raises(InputValidationError):
for item in targets:
_check_query_target(item)


def test_check_query_target_rejects_even_when_rule_would_permit():
"""Layer-1 must reject metacharacters even when Layer-2 would permit them.

A rule with `condition='.*'` would permit anything at the directive-rule
layer, but `_check_query_target` runs first and must still reject.
"""
permissive_rule = RuleWithPattern(
condition=r".*", action="permit", commands=["show {target}"]
)
target = "1.1.1.1;reboot"
# The rule itself would happily permit this if it ever saw it...
# (re.fullmatch(".*") matches any string, including ones with `;`).
assert permissive_rule.validate_target(target, multiple=False) is True
# ...but the Layer-1 check that runs *before* the rule rejects it first:
with pytest.raises(InputValidationError):
_check_query_target(target)