Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions datadog_sync/utils/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,56 @@ def init_topological_sorter(
sorter = TopologicalSorter(graph)
sorter.prepare()
return sorter


def detect_circular_dependencies(graph: Dict[Tuple[str, str], Set[Tuple[str, str]]]) -> Optional[List[Tuple[str, str]]]:
"""Detect circular dependencies in a dependency graph.

Args:
graph: Dependency graph to check

Returns:
None if no cycles, or a list showing one cycle path if found

Example:
>>> graph = {("monitors", "A"): {("monitors", "B")}, ("monitors", "B"): {("monitors", "A")}}
>>> detect_circular_dependencies(graph)
[("monitors", "A"), ("monitors", "B"), ("monitors", "A")]
"""
try:
sorter = TopologicalSorter(graph)
sorter.prepare()
# If prepare() succeeds, no cycles
return None
except Exception:
# Cycle detected - try to find it using DFS
visited = set()
path = []

def find_cycle(node):
if node in path:
# Found cycle!
cycle_start = path.index(node)
return path[cycle_start:] + [node]

if node in visited:
return None

visited.add(node)
path.append(node)

for neighbor in graph.get(node, []):
cycle = find_cycle(neighbor)
if cycle:
return cycle

path.pop()
return None

for node in graph:
if node not in visited:
cycle = find_cycle(node)
if cycle:
return cycle

return None
246 changes: 240 additions & 6 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ResourcesHandler:
def __init__(self, config: Configuration) -> None:
self.config = config
self.sorter: Optional[TopologicalSorter] = None
self.cleanup_sorter: Optional[TopologicalSorter] = None
self.worker: Optional[Workers] = None
self._dependency_graph = Optional[Dict[Tuple[str, str], List[Tuple[str, str]]]]

Expand Down Expand Up @@ -90,12 +91,63 @@ async def apply_resources(self) -> Tuple[int, int]:
if cleanup_resources:
cleanup = _cleanup_prompt(self.config, cleanup_resources)
if cleanup:
self.config.logger.info("cleaning up resources...")
await self.worker.init_workers(self._cleanup_worker, None, None)
for i in cleanup_resources:
self.worker.work_queue.put_nowait(i)
await self.worker.schedule_workers()
self.config.logger.info("finished cleaning up resources")
self.config.logger.info("cleaning up resources with dependency ordering...")

# Build reverse dependency graph for ordered cleanup
try:
cleanup_graph = self.get_cleanup_dependency_graph(cleanup_resources)
self.config.logger.info(f"Built cleanup dependency graph with {len(cleanup_graph)} resources")

# Detect circular dependencies
from datadog_sync.utils.resource_utils import detect_circular_dependencies

cycle = detect_circular_dependencies(cleanup_graph)
if cycle:
# Circular dependency detected!
cycle_str = " -> ".join([f"{rt}:{rid}" for rt, rid in cycle])
self.config.logger.error(f"Circular dependency detected in cleanup graph: {cycle_str}")
self.config.logger.error(
"Cannot safely delete resources. Please manually break circular references first."
)
raise ValueError(
f"Circular dependency in cleanup graph: {cycle_str}. "
"Manual intervention required to break the cycle."
)

# Initialize cleanup sorter
self.cleanup_sorter = init_topological_sorter(cleanup_graph)

# Initialize workers with cleanup sorter stop condition
await self.worker.init_workers(
self._cleanup_worker, lambda: not self.cleanup_sorter.is_active(), None
)

# Run cleanup with ordering
if self.config.show_progress_bar:
await self.worker.schedule_workers_with_pbar(
total=len(cleanup_graph), additional_coros=[self.run_cleanup_sorter()]
)
else:
await self.worker.schedule_workers(additional_coros=[self.run_cleanup_sorter()])

self.config.logger.info("finished cleaning up resources")

except ValueError:
# Circular dependency - already logged, re-raise
raise
except Exception as e:
# Unexpected error building or running cleanup graph
self.config.logger.error(f"Error during ordered cleanup: {str(e)}")
self.config.logger.warning(
"Falling back to unordered cleanup (may fail due to dependency issues)"
)

# Fallback to old unordered behavior
await self.worker.init_workers(self._cleanup_worker, None, None)
for i in cleanup_resources:
self.worker.work_queue.put_nowait(i)
await self.worker.schedule_workers()
self.config.logger.info("finished cleaning up resources (unordered fallback)")

# Run pre-apply hooks
resource_types = set(i[0] for i in self._dependency_graph)
Expand Down Expand Up @@ -349,6 +401,10 @@ async def _cleanup_worker(self, q_item: List) -> None:
await r_class._send_action_metrics("delete", _id, Status.FAILURE.value)
self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}")
finally:
# Mark as done in cleanup sorter if it exists
if hasattr(self, "cleanup_sorter") and self.cleanup_sorter:
self.cleanup_sorter.done(q_item)

if not r_class.resource_config.concurrent:
r_class.resource_config.async_lock.release()

Expand All @@ -370,6 +426,30 @@ async def run_sorter(self):
await self.worker.work_queue.put(node)
await asyncio.sleep(0)

async def run_cleanup_sorter(self):
"""Mirror of run_sorter() but for cleanup operations.

Continuously feeds deletion-ready resources to workers in proper order.
Resources are only deleted after all their dependents are deleted.
"""
loop = asyncio.get_event_loop()

while await loop.run_in_executor(None, self.cleanup_sorter.is_active):
for node in self.cleanup_sorter.get_ready():
resource_type, _id = node

# Verify resource still exists in destination
if _id not in self.config.state.destination[resource_type]:
# Already deleted or doesn't exist
self.config.logger.debug(f"Resource {resource_type}:{_id} already deleted, marking as done")
self.cleanup_sorter.done(node)
continue

# Add to work queue for deletion
await self.worker.work_queue.put(node)

await asyncio.sleep(0)

def get_dependency_graph(self) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]:
"""Build the dependency graph for all resources.

Expand Down Expand Up @@ -428,6 +508,160 @@ def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple

return failed_connections, missing_resources

def get_cleanup_dependency_graph(
self, cleanup_resources: Dict[Tuple[str, str], str | None]
) -> Dict[Tuple[str, str], Set[Tuple[str, str]]]:
"""Build REVERSE dependency graph for cleanup.

For deletion, we need to delete dependents BEFORE dependencies.
This inverts the normal creation graph.

Args:
cleanup_resources: Resources to be deleted from destination

Returns:
Dict mapping (resource_type, _id) to set of resources that depend on it

Example:
If Dashboard depends on Monitor:
Creation graph: {("dashboards", "dash-1"): [("monitors", "mon-1")]}
Cleanup graph: {("monitors", "mon-1"): {("dashboards", "dash-1")}}

This means: Monitor "mon-1" can only be deleted AFTER Dashboard "dash-1"
"""
reverse_graph = defaultdict(set)

# Initialize all cleanup resources with empty dependencies
for resource_key in cleanup_resources.keys():
reverse_graph[resource_key] = set()

# Build reverse dependencies by scanning destination state
for resource_type, _id in cleanup_resources.keys():
if resource_type not in self.config.resources:
continue

r_config = self.config.resources[resource_type].resource_config
if not r_config.resource_connections:
continue

# Get the actual resource from destination state (already in memory)
if _id not in self.config.state.destination[resource_type]:
self.config.logger.debug(
f"Resource {resource_type}:{_id} not in destination state, skipping dependency analysis"
)
continue

resource = self.config.state.destination[resource_type][_id]

# For each dependency this resource has
for dep_type, attr_paths in r_config.resource_connections.items():
for attr_path in attr_paths:
# Find dependency IDs in the resource
dep_ids = self._extract_dependency_ids(resource, attr_path, dep_type)

# For each dependency, add THIS resource as a dependent
for dep_id in dep_ids:
dep_key = (dep_type, dep_id)
# Only include if the dependency is also being cleaned up
if dep_key in cleanup_resources:
# This says: "dep_key must be deleted AFTER (resource_type, _id)"
reverse_graph[dep_key].add((resource_type, _id))
self.config.logger.debug(f"Dependency edge: {dep_type}:{dep_id} <- {resource_type}:{_id}")

return dict(reverse_graph)

def _extract_dependency_ids(self, resource: Dict, attr_path: str, dep_type: str) -> Set[str]:
"""Extract dependency IDs from a resource at the given attribute path.

Args:
resource: The resource dict
attr_path: Dot-separated path like "widgets.definition.alert_id"
dep_type: Type of dependency (for context)

Returns:
Set of dependency IDs found
"""
ids = set()

def extract_from_obj(obj, path_parts):
if not obj or not path_parts:
return

current_key = path_parts[0]
remaining = path_parts[1:]

if isinstance(obj, list):
for item in obj:
extract_from_obj(item, path_parts)
elif isinstance(obj, dict):
if current_key in obj:
if not remaining:
# We're at the target attribute
value = obj[current_key]
if value:
# Handle both lists and single values
values = [value] if not isinstance(value, list) else value
for v in values:
if v:
# Parse prefixed IDs (e.g., "dashboard:abc-123" → "abc-123")
parsed_id = self._parse_prefixed_id(str(v), dep_type)
ids.add(parsed_id)
else:
# Keep traversing
extract_from_obj(obj[current_key], remaining)

path_parts = attr_path.split(".")
extract_from_obj(resource, path_parts)
return ids

def _parse_prefixed_id(self, value: str, expected_resource_type: str) -> str:
"""Parse IDs that may have resource type prefixes.

Restriction policies and some other resources use prefixed IDs like
"dashboard:abc-123" or "slo:xyz-789". This method strips the prefix
to get the actual resource ID that matches the state keys.

Examples:
"dashboard:abc-123" with expected_resource_type="dashboards" → "abc-123"
"slo:xyz-789" with expected_resource_type="service_level_objectives" → "xyz-789"
"abc-123" with any type → "abc-123" (passthrough)

Args:
value: The ID value, possibly prefixed
expected_resource_type: The resource type we expect (e.g., "dashboards")

Returns:
The parsed ID without prefix
"""
# Check if value contains a colon (indicates prefix)
if ":" not in value:
return value

# Split on first colon only
prefix, actual_id = value.split(":", 1)

# Map prefix to resource type
prefix_to_resource_type = {
"dashboard": "dashboards",
"slo": "service_level_objectives",
"notebook": "notebooks",
"monitor": "monitors",
"user": "users",
"role": "roles",
"team": "teams",
"security-rule": "security_monitoring_rules",
}

# If prefix matches expected resource type, return just the ID
if prefix_to_resource_type.get(prefix) == expected_resource_type:
return actual_id

# If prefix doesn't match, return original value (might be a false positive)
self.config.logger.debug(
f"ID '{value}' has prefix '{prefix}' but expected resource type '{expected_resource_type}'"
)
return value


def _cleanup_prompt(
config: Configuration, resources_to_cleanup: Dict[Tuple[str, str], str | None], prompt: bool = True
Expand Down
Loading