Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,72 @@ and this file MUST be updated together whenever `__version__` changes.

---

## [0.8.0-dev2] — 2026-06-01

### Added — Reflex skeleton: registry + runner + 3 first-party handlers

Second sub-step of the brain refactor. Stands up the **reflex layer** —
fast deterministic responders that subscribe to the thalamus and produce
:class:`ReflexOutcome` records. Nothing publishes to the bus yet (that
lands in `0.8.0-dev3`), so the handlers idle. The plumbing they idle on
is fully tested against `InMemoryEventBus`, which (per `0.8.0-dev1`'s
contract suite) behaves identically to the production `NatsEventBus` —
so when the first publisher lands the path lights up end-to-end.

#### Public surface

- `netcortex/reflex/`
- `protocol.py` — `ReflexHandler` Protocol + frozen `ReflexOutcome`
dataclass + `Severity` / `OutcomeKind` literal types. Severity is a
four-bucket scale (`info | warn | high | critical`) so downstream
alerting can pattern-match without parsing free-form strings.
- `registry.py` — process-wide `register_handler()` / `get_handler()`
/ `all_handlers()` / `clear_registry()`. Idempotent re-registration
of the same instance; duplicate-id collisions raise
`DuplicateHandlerError` (handler ids appear on every persisted
outcome — silent shadowing would be an operator footgun).
- `runner.py` — `ReflexRunner` wires the registry to one bus, spawns
one asyncio task per handler, isolates per-handler exceptions
(raising handler → `errored` outcome with truncated traceback in
`diagnostic`, dispatcher continues). Idempotent `start()` / `stop()`;
`ready_event` for tests that need cross-handler ordering.

#### First-party handlers (idle until 0.8.0-dev3)

| Handler id | Subject pattern | Severity | Notes |
|---|---|---|---|
| `link_down` | `sensory.snmp.trap.link_down.>` | `high` | Caps upstream key echo at 16 to bound outcome size |
| `security_webhook` | `sensory.meraki.webhook.security.>` | from payload | Coarse Meraki→NetCortex severity map (`informational`/`warning`/`high`/`critical`) |
| `bgp_drop` | `sensory.snmp.trap.bgp_backward_transition.>` | `high` | Target composed as `device|peer` when both are known, falls back gracefully |

Each handler is intentionally minimal — it captures the event, extracts
a target, returns a `logged` outcome. The richer behavior (semantic
memory lookup, maintenance-window check, dedup, NetBox journal mirror)
lands in later sub-steps once the first publisher exists to drive it.

#### Tests

- `tests/reflex/test_registry.py` — 7 cases: register/lookup, insertion
ordering, duplicate rejection, idempotent re-register, type rejection,
missing-key, clear. Uses a save/restore fixture so it does not leak the
cleared state to sibling test files.
- `tests/reflex/test_runner.py` — 8 cases against `InMemoryEventBus`:
dispatch matching events, pattern-filter non-matching, fan-out to
multiple handlers, exception isolation (`errored` outcome continues
dispatcher), `None` outcome not recorded, idempotent start/stop,
stop-without-start safety, registry enumeration default-arg path.
- `tests/reflex/test_handlers.py` — 14 cases pinning the operator-facing
surface (handler id + subscription pattern) and exercising each
handler's outcome-shape contract + target-extraction fallbacks.

### Not yet wired

- Still no publishers. Pollers continue to call correlator + writeback
directly. The first dual-write publisher lands in `0.8.0-dev3`.
- Outcomes are logged only — Neo4j `:ReflexEvent` persistence + NetBox
journal mirror land in `0.8.0-dev3` once the writer Protocols have a
consumer to justify them.

## [0.8.0-dev1] — 2026-06-01

### Added — Thalamus: NATS-backed event bus lands
Expand Down
2 changes: 1 addition & 1 deletion netcortex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes.
"""

__version__ = "0.8.0-dev1"
__version__ = "0.8.0-dev2"
38 changes: 38 additions & 0 deletions netcortex/reflex/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Reflex — fast deterministic responders on the event bus.

Reflex handlers subscribe to narrow NATS subject patterns, run
deterministic logic in milliseconds, and produce :class:`ReflexOutcome`
records the deliberative loop later consolidates.

Public surface:

* :class:`ReflexHandler` — Protocol every handler obeys.
* :class:`ReflexOutcome` — frozen record of what a handler decided.
* :func:`register_handler` — registration entry point for handler modules.
* :class:`ReflexRunner` — wires the registered handler set to a bus.
* :mod:`netcortex.reflex.handlers` — importing this submodule registers
the first-party handlers (``link_down``, ``security_webhook``,
``bgp_drop``).

See ``docs/architecture/brain.md`` for the role of reflex in the
brain-mapped architecture.
"""

from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome
from netcortex.reflex.registry import (
DuplicateHandlerError,
all_handlers,
get_handler,
register_handler,
)
from netcortex.reflex.runner import ReflexRunner

__all__ = [
"DuplicateHandlerError",
"ReflexHandler",
"ReflexOutcome",
"ReflexRunner",
"all_handlers",
"get_handler",
"register_handler",
]
26 changes: 26 additions & 0 deletions netcortex/reflex/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""First-party reflex handlers.

Importing this package registers every handler below with the global
registry in :mod:`netcortex.reflex.registry`. That side-effect is the
point — the runner enumerates the registry, so handlers only need to be
*imported* (not explicitly enumerated) for the runner to find them.

To add a new first-party handler:

1. Create ``netcortex/reflex/handlers/<your_handler>.py`` that calls
:func:`netcortex.reflex.registry.register_handler` at module scope.
2. Add a line to this file importing the new module.
3. Cover the subject pattern with a test in
``tests/reflex/test_handlers.py``.

That is the entire surface — no entry-point discovery, no dynamic
loading. By design (see ``docs/architecture/brain.md`` on plasticity).
"""

from __future__ import annotations

# Import-for-side-effect — each module registers itself on import.
# Keep these imports alphabetical so a diff reviewer can spot additions.
from netcortex.reflex.handlers import bgp_drop # noqa: F401
from netcortex.reflex.handlers import link_down # noqa: F401
from netcortex.reflex.handlers import security_webhook # noqa: F401
97 changes: 97 additions & 0 deletions netcortex/reflex/handlers/bgp_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""``bgp_drop`` — reflex handler for BGP session state-down signals.

Subscribes to the BGP4-MIB ``bgpBackwardTransition`` trap (and, once the
streaming telemetry adapter lands, also ``sensory.cisco.mdt.bgp.>``
neighbor-down samples). The fast deterministic response is to record a
session-down outcome so the deliberative loop (route convergence
analysis, prefix advertisement drift) has the wall-clock anchor.

This module is dev2 scaffolding. When publishers land in 0.8.0-dev3+ the
handler will additionally:

* resolve the peer IP against semantic memory's ``:BgpSession`` nodes so
the outcome carries the canonical session identifier, not just the
peer address;
* check whether the device is in a maintenance window OR the peer is a
known-flapping route-server (operator policy);
* attach a NetBox journal entry to the BGP session object (once the
reconciliation engine starts surfacing those — they are not first-
class in NetBox today, so the journal will live on the device);
* trigger a deliberative follow-up to assess prefix-advertisement
impact, comparing the last-known advertised prefix set on this
session against the post-drop topology snapshot.

None of that is in dev2. The current handler logs and returns a
``high``-severity outcome; downstream consumers can already key off it.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Final

from netcortex.contracts.event_bus import EventMessage
from netcortex.reflex.protocol import ReflexOutcome
from netcortex.reflex.registry import register_handler

# Subject pattern.
#
# Real publishers will use
# ``sensory.snmp.trap.bgp_backward_transition.<device_id>`` or
# ``sensory.cisco.mdt.bgp_neighbor_state.<device_id>``. For dev2 the
# handler subscribes to the SNMP trap subject only; the second
# subscription (or a glob) lands once the telemetry adapter exists.
_PATTERN: Final[str] = "sensory.snmp.trap.bgp_backward_transition.>"


class BgpDropHandler:
"""Reflex for BGP session backward-transition (down) events."""

id: Final[str] = "bgp_drop"
pattern: Final[str] = _PATTERN

async def handle(self, event: EventMessage) -> ReflexOutcome | None:
payload = event.payload
device = (
payload.get("device_id")
or payload.get("device")
or payload.get("target")
)
peer = payload.get("peer") or payload.get("peer_ip")
peer_asn = payload.get("peer_asn") or payload.get("remote_as")
last_state = payload.get("last_state") or payload.get("previous_state")
# Compose a target identifier that survives whether or not the peer
# IP is known — preferring the canonical session "device|peer" key
# when both are available, falling back to whichever is present.
if device and peer:
target = f"{device}|{peer}"
elif peer:
target = str(peer)
elif device:
target = str(device)
else:
target = None
return ReflexOutcome(
handler=self.id,
subject=event.subject,
target=target,
# BGP session loss is high-severity by default. Operators can
# tune later via the policy library once it exists; we do not
# second-guess severity inside the handler.
severity="high",
occurred_at=datetime.now(tz=timezone.utc),
payload={
"device": device,
"peer": peer,
"peer_asn": peer_asn,
"last_state": last_state,
},
outcome="logged",
rationale=(
f"BGP session {target!r} backward-transition observed; "
f"last_state={last_state!r} — dev2 idle handler"
),
)


_HANDLER = register_handler(BgpDropHandler())
83 changes: 83 additions & 0 deletions netcortex/reflex/handlers/link_down.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""``link_down`` — reflex handler for interface-down signals.

Subscribes to the SNMP linkDown trap subject. In the brain-mapped
architecture this is the fast deterministic response to an interface
going hard-down: log it now, let the deliberative loop (prefrontal,
0.11.0) decide whether to open a ticket, page someone, or just wait
for the symmetric linkUp.

This module is dev2 scaffolding — the handler is registered and the
runner will subscribe it to the bus, but no publisher exists yet. The
first real linkDown publish lands in 0.8.0-dev3 when the SNMP-trap
sensory adapter (``sensory/trap/snmp.py``) is wired in.

When publishers exist, the handler will also:

* fetch the affected (device, interface) from semantic memory and verify
it is not in a maintenance window;
* deduplicate against a Redis "recently seen" window so a flapping link
produces one outcome per minute, not one per trap;
* attach a NetBox journal entry on the Interface object so an operator
sees the trap immediately in the tool they live in;
* emit a follow-up ``reflex.link_down.applied`` event so consolidation
knows a synthetic interface-state transition has been recorded.

None of that is in dev2. The current implementation captures the trap,
extracts the target, and returns a ``logged`` outcome.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Final

from netcortex.contracts.event_bus import EventMessage
from netcortex.reflex.protocol import ReflexOutcome
from netcortex.reflex.registry import register_handler

# Subject pattern.
#
# Real publishers in 0.8.0-dev3+ will use
# ``sensory.snmp.trap.link_down.<device_id>`` and emit one event per
# (device, interface) transition. The trailing ``>`` matches any number
# of further tokens so the handler can be subscribed today and the
# publisher's exact subject layout can evolve without redeploying the
# handler.
_PATTERN: Final[str] = "sensory.snmp.trap.link_down.>"


class LinkDownHandler:
"""Reflex for IF-MIB linkDown traps."""

id: Final[str] = "link_down"
pattern: Final[str] = _PATTERN

async def handle(self, event: EventMessage) -> ReflexOutcome | None:
payload = event.payload
target = (
payload.get("device_id")
or payload.get("device")
or payload.get("target")
)
interface = payload.get("interface") or payload.get("if_name")
return ReflexOutcome(
handler=self.id,
subject=event.subject,
target=str(target) if target else None,
severity="high",
occurred_at=datetime.now(tz=timezone.utc),
payload={
"interface": interface,
# Cap the upstream payload echo so a chatty publisher
# cannot blow up the outcome record.
"upstream_keys": sorted(payload.keys())[:16],
},
outcome="logged",
rationale=(
f"linkDown observed on {target!r} interface {interface!r}; "
"dev2 idle handler — no remediation yet"
),
)


_HANDLER = register_handler(LinkDownHandler())
Loading
Loading