Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/decisions.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,21 @@ If you want two or more instances of SCRAM to share data between themselves we h
3. For normal syncing where both translators have been connected, we are currently using process_updates (since it runs
regularly) to grab new data out of the database that comes from other connected instances and reannounces those locally.

##### The Unblocking Problem® (PR #157)
##### The Unblocking Problem® (PRs #157 and #193)

We had a bug where entries that had expired or been deleted on one SCRAM instance were never being unblocked on other instances. The original sync logic only looked at the `when` field (creation time), so it would happily find new entries but completely miss any that were modified, expired, or de-activated after creation. This was bad because things could get stuck being blocked forever (until gobgp and translator are restarted) and not show up in the web UI (because the database says it's not blocked.) Eventually, we should add "ghost" routes to the UI to show someone if there is something that's advertised by its translator but not in the database. We could even use a cute little ghost icon for it!

The fix takes advantage of the already existing `django-simple-history` models that track any entry modifications. To sync, we query the history models to see if something has changed in any way, and we just go ahead and re-send that to the translator, ensuring eventual consistency (since translator is idempotent).

We initially tried excluding any entries from the local instance from reprocessing (`originating_scram_instance != settings.SCRAM_HOSTNAME`) because we thought local entries would already be in the right state (i.e. if something blocked the entry on one host, it would then deactivate it on the same host), but this was wrong for expiration. When entries expire, it could be performed from any instance at any time (via the health check), not just the instance that it was originally blocked on, so the originating instance needs to reprocess even its own expired entries to guarantee that we send the `translator_remove` message. We removed the exclusion filter entirely (PR #193), so now all instances reprocess everything that changed. It's a bit redundant and wastes more resources, but it's idempotent and good enough until we re-design syncing from the ground up.

##### Future Work

Honestly, the use of compose health checks is kind of gross and we realize this. The `process_updates` polling approach works, but it's not elegant. We're probably looking at Celery or some other task runner to handle this properly. it'd be good to have something that can react to database changes on a message bus rather than polling every 30 seconds. But this gets things fixed for now, and the history-based seems solid enough for our needs.

#### Entries Page

We intentionally chose to only list the active entries. Our thinking is that the home page shows the most recent additions.
Then, if you went to the entries page, it would be overwhelmingly huge to show all the historical entries including the
Then, if you went to the entries page, it would be overwhelmingly huge to show all the historical entries including the
ones that timed out/were deactivated. If you wanted to know about a specific entry even if it were not currently active
(to see the history of it say), you would likely be using the search anyway.
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,37 @@ Feature: Multi-Instance Synchronization
When secondary instance runs process_updates
Then the entry 2001:db8::30/128 is inactive on secondary instance
And secondary announces 2001:db8::30/128 removal to block translators

Scenario: IPv4 entry expiring on primary instance is reprocessed by primary
Given a block actiontype is defined
When we create entry 192.168.100.4/32 with 3 second expiration on primary instance
And we wait for database commit
And we wait 4 seconds for expiration
When primary instance runs process_updates to expire entries
Then primary announces 192.168.100.4/32 removal to block translators

Scenario: IPv6 entry expiring on primary instance is reprocessed by primary
Given a block actiontype is defined
When we create entry 2001:db8::4/128 with 3 second expiration on primary instance
And we wait for database commit
And we wait 4 seconds for expiration
When primary instance runs process_updates to expire entries
Then primary announces 2001:db8::4/128 removal to block translators

Scenario: IPv4 entry deactivated on primary instance is reprocessed by primary
Given a block actiontype is defined
When we create an entry 192.168.100.5/32 on primary instance
And we wait for database commit
When we deactivate entry 192.168.100.5/32 on primary instance
And we wait for database commit
When primary instance runs process_updates to expire entries
Then primary announces 192.168.100.5/32 removal to block translators

Scenario: IPv6 entry deactivated on primary instance is reprocessed by primary
Given a block actiontype is defined
When we create an entry 2001:db8::5/128 on primary instance
And we wait for database commit
When we deactivate entry 2001:db8::5/128 on primary instance
And we wait for database commit
When primary instance runs process_updates to expire entries
Then primary announces 2001:db8::5/128 removal to block translators
30 changes: 30 additions & 0 deletions scram/route_manager/tests/integration/steps/multi_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,33 @@ def check_removal_announced_on_secondary(context, ip):
assert ip in reprocessed_list, (
f"Expected {ip} in reprocessed list, got {reprocessed_list}. Instance hostname: {hostname}"
)


@then("primary announces {ip:S} removal to block translators")
def check_removal_announced_on_primary(context, ip):
"""Verifies entry removal was processed and announced to translators."""
if not hasattr(context, "auth_token"):
context.auth_token = get_auth_token(DJANGO_PRIMARY_URL)

try:
list_response = requests.get(
f"{DJANGO_PRIMARY_URL}/api/v1/entries/",
headers={"Authorization": f"Token {context.auth_token}"},
timeout=10,
)
list_response.raise_for_status()

entries = list_response.json().get("results", [])
for entry in entries:
if entry.get("route") == ip:
context.test.fail(f"Entry {ip} should be inactive but was found in active entries list")
except requests.exceptions.RequestException as e:
context.test.fail(f"Failed to call API: {e}")

process_data = getattr(context, "primary_process_data", {})
if process_data:
reprocessed_list = process_data.get("entries_reprocessed_list", [])
hostname = process_data.get("scram_hostname", "UNKNOWN")
assert ip in reprocessed_list, (
f"Expected {ip} in reprocessed list, got {reprocessed_list}. Instance hostname: {hostname}"
)
29 changes: 16 additions & 13 deletions scram/route_manager/tests/test_process_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ def test_finds_entry_from_other_instance(self, actiontype, other_instance):
assert len(result) == 1
assert result[0].id == entry.id

def test_excludes_current_instance_entries(self, actiontype, current_instance):
"""Does not return entries from the current SCRAM instance."""
create_entry(actiontype, "192.0.2.2/32", current_instance)
def test_includes_current_instance_entries(self, actiontype, current_instance):
"""Processes entries from the current SCRAM instance."""
entry = create_entry(actiontype, "192.0.2.2/32", current_instance)
cutoff = datetime.now(UTC) - timedelta(minutes=2)

result = get_entries_to_process(cutoff)

assert result == []
assert len(result) == 1
assert result[0].id == entry.id

def test_finds_modified_entries(self, actiontype, other_instance):
"""Finds entries modified after creation (uses history tracking)."""
Expand Down Expand Up @@ -175,6 +176,17 @@ def test_expired_entry_found_as_inactive(self, actiontype, other_instance):
assert len(result) == 1
assert result[0].id == entry.id

def test_processes_entries_from_current_instance(self, actiontype, current_instance):
"""Verifies that entries from the current instance are processed (PR #193 fix)."""
entry = create_entry(actiontype, "192.0.2.60/32", current_instance)
cutoff = datetime.now(UTC) - timedelta(minutes=2)

result = get_entries_to_process(cutoff)

assert len(result) == 1
assert result[0].id == entry.id
assert result[0].originating_scram_instance == current_instance


class TestCheckForOrphanedHistory:
"""Tests for check_for_orphaned_history()."""
Expand All @@ -199,12 +211,3 @@ def test_no_warning_when_entry_exists(self, caplog, actiontype, other_instance):
check_for_orphaned_history({entry.id}, [entry])

assert len(caplog.records) == 0

def test_accounts_for_local_entries(self, caplog, actiontype, current_instance):
"""Local entries excluded from processing are not flagged as orphans."""
local_entry = create_entry(actiontype, "10.1.0.3/32", current_instance)

# Entry exists but was excluded from entries_to_process because it's local
check_for_orphaned_history({local_entry.id}, [])

assert len(caplog.records) == 0
29 changes: 9 additions & 20 deletions scram/route_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,18 @@ def add_entry(request):


def get_entries_to_process(cutoff_time: timedelta) -> list[Entry]:
"""Return entries that have been recently modified by another SCRAM instance.
"""Return entries that have been recently modified by any SCRAM instance.

Queries the Entry history (simple history) table to find any entries modified
since the cutoff time, then filters to only those originating from other SCRAM instances.
since the cutoff time.

Args:
cutoff_time(timedelta): Only consider entries modified after this time.

Returns:
List of Entry objects that need to be reprocessed.
"""
logger.debug("Looking for entries modified by other SCRAM instances")
logger.debug("Looking for entries modified by any SCRAM instance")

# Grab (only, via values_list) the Entry IDs that have had their history records touched since the cutoff time.
recently_touched_ids = set(Entry.history.filter(history_date__gt=cutoff_time).values_list("id", flat=True))
Expand All @@ -170,17 +170,12 @@ def get_entries_to_process(cutoff_time: timedelta) -> list[Entry]:

logger.debug("Found recently touched entry IDs: %s", recently_touched_ids)

# Using the ID's from above, fetch all matching entries and associated models excluding entries from this instance.
entries_to_process = list(
Entry.objects
.filter(id__in=recently_touched_ids)
.exclude(originating_scram_instance=settings.SCRAM_HOSTNAME)
.select_related("actiontype", "route")
)
# Using the ID's from above, fetch all matching entries and associated models.
entries_to_process = list(Entry.objects.filter(id__in=recently_touched_ids).select_related("actiontype", "route"))

check_for_orphaned_history(recently_touched_ids, entries_to_process)

logger.debug("Found %d entries to process from other instances", len(entries_to_process))
logger.debug("Found %d entries to process", len(entries_to_process))
return entries_to_process


Expand All @@ -192,18 +187,12 @@ def check_for_orphaned_history(recently_touched_ids: set[int], entries_to_proces

Args:
recently_touched_ids(set): Set of Entry IDs that have recent history records.
entries_to_process(list): Entry objects fetched from the database (excludes local instance).
entries_to_process(list): Entry objects fetched from the database.
"""
# IDs of entries that currently exist in the DB (excluding local instance entries)
# IDs of entries that currently exist in the DB
found_ids = {entry.id for entry in entries_to_process}
# Account for entries filtered out cuz they're from this instance
local_ids = set(
Entry.objects.filter(
id__in=recently_touched_ids, originating_scram_instance=settings.SCRAM_HOSTNAME
).values_list("id", flat=True)
)
# IDs with history but no corresponding Entry row = orphaned (hard-deleted outside of Entry.delete())
orphaned_ids = recently_touched_ids - found_ids - local_ids
orphaned_ids = recently_touched_ids - found_ids
if orphaned_ids:
logger.warning("Found history records with no corresponding Entry: %s", orphaned_ids)

Expand Down
Loading