Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/dispatch/case/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
send_case_rating_feedback_message,
send_case_update_notifications,
send_event_paging_message,
send_event_update_prompt_reminder
send_event_update_prompt_reminder,
)
from .models import Case
from .service import get
Expand Down Expand Up @@ -768,6 +768,43 @@ def map_case_roles_to_incident_roles(
return list(incident_roles) or None


def copy_case_events_to_incident(
case: Case,
incident: Incident,
db_session: Session,
):
"""Copies all timeline events from a case to an incident."""
# Get all events from the case
case_events = event_service.get_by_case_id(db_session=db_session, case_id=case.id).all()

if not case_events:
log.info(f"No events to copy from case {case.id} to incident {incident.id}")
return

log.info(f"Copying {len(case_events)} events from case {case.id} to incident {incident.id}")

for case_event in case_events:
# Create a new event for the incident with the same data
copied_source = f"{case_event.source} (copied from {case.name})"
event_service.log_incident_event(
db_session=db_session,
source=copied_source,
description=case_event.description,
incident_id=incident.id,
individual_id=case_event.individual_id,
started_at=case_event.started_at,
ended_at=case_event.ended_at,
details=case_event.details,
type=case_event.type,
owner=case_event.owner,
pinned=case_event.pinned,
)

log.info(
f"Successfully copied {len(case_events)} events from case {case.id} to incident {incident.id}"
)


def common_escalate_flow(
case: Case,
incident: Incident,
Expand All @@ -793,6 +830,9 @@ def common_escalate_flow(
db_session.add(case)
db_session.commit()

# Copy timeline events from case to incident
copy_case_events_to_incident(case=case, incident=incident, db_session=db_session)

event_service.log_case_event(
db_session=db_session,
source="Dispatch Core App",
Expand Down Expand Up @@ -862,7 +902,10 @@ def common_escalate_flow(
event_service.log_case_event(
db_session=db_session,
source="Dispatch Core App",
description=f"The members of the incident's tactical group {incident.tactical_group.email} have been given permission to access the case's storage folder",
description=(
f"The members of the incident's tactical group {incident.tactical_group.email} "
f"have been given permission to access the case's storage folder"
),
case_id=case.id,
)

Expand Down
51 changes: 51 additions & 0 deletions tests/case/test_case_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,54 @@ def test_delete(session, case: Case):

t_case = case_get(db_session=session, case_id=case_id)
assert not t_case


def test_copy_case_events_to_incident(session, case: Case, incident):
"""Test that case events are properly copied to incident when escalating."""
from dispatch.case.flows import copy_case_events_to_incident
from dispatch.event.service import get_by_case_id, get_by_incident_id, log_case_event

# Create some test events for the case
log_case_event(
db_session=session,
source="Test Source",
description="Test case event 1",
case_id=case.id,
)

log_case_event(
db_session=session,
source="Test Source 2",
description="Test case event 2",
case_id=case.id,
)

# Verify case has events
case_events = get_by_case_id(db_session=session, case_id=case.id).all()
initial_case_event_count = len(case_events)
assert initial_case_event_count == 2

# Verify incident has no events initially
incident_events = get_by_incident_id(db_session=session, incident_id=incident.id).all()
initial_incident_event_count = len(incident_events)
assert initial_incident_event_count == 0

# Copy events from case to incident
copy_case_events_to_incident(case=case, incident=incident, db_session=session)

# Verify incident now has the copied events
incident_events_after = get_by_incident_id(db_session=session, incident_id=incident.id).all()
final_incident_event_count = len(incident_events_after)
assert final_incident_event_count == initial_case_event_count

# Verify case events are unchanged
case_events_after = get_by_case_id(db_session=session, case_id=case.id).all()
final_case_event_count = len(case_events_after)
assert final_case_event_count == initial_case_event_count

# Verify the copied events have the correct source
for event in incident_events_after:
assert (
event.source == f"Test Source (copied from {case.name})"
or event.source == f"Test Source 2 (copied from {case.name})"
)
Loading