Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions src/dispatch/auth/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,22 @@ def has_required_permissions(
pk = PrimaryKeyModel(id=request.path_params["incident_id"])
current_incident = incident_service.get(db_session=request.state.db, incident_id=pk.id)

if not current_incident:
return False

# Check if incident is restricted - only admins can join restricted incidents
if current_incident.visibility == Visibility.restricted:
return OrganizationAdminPermission(request=request)
return any_permission(
permissions=[OrganizationAdminPermission],
request=request,
)

# Check project's allow_self_join setting - only admins can override
if not current_incident.project.allow_self_join:
return any_permission(
permissions=[OrganizationAdminPermission],
request=request,
)

return True

Expand Down Expand Up @@ -431,6 +445,44 @@ def has_required_permissions(
return True


class CaseReporterPermission(BasePermission):
def has_required_permissions(
self,
request: Request,
) -> bool:
current_user = get_current_user(request=request)
pk = PrimaryKeyModel(id=request.path_params["case_id"])
current_case = case_service.get(db_session=request.state.db, case_id=pk.id)

if not current_case:
return False

if current_case.reporter:
if current_case.reporter.individual.email == current_user.email:
return True

return False


class CaseAssigneePermission(BasePermission):
def has_required_permissions(
self,
request: Request,
) -> bool:
current_user = get_current_user(request=request)
pk = PrimaryKeyModel(id=request.path_params["case_id"])
current_case = case_service.get(db_session=request.state.db, case_id=pk.id)

if not current_case:
return False

if current_case.assignee:
if current_case.assignee.individual.email == current_user.email:
return True

return False


class CaseEditPermission(BasePermission):
def has_required_permissions(
self,
Expand All @@ -439,7 +491,8 @@ def has_required_permissions(
return any_permission(
permissions=[
OrganizationAdminPermission,
CaseParticipantPermission,
CaseReporterPermission,
CaseAssigneePermission,
],
request=request,
)
Expand Down Expand Up @@ -467,8 +520,22 @@ def has_required_permissions(
pk = PrimaryKeyModel(id=request.path_params["case_id"])
current_case = case_service.get(db_session=request.state.db, case_id=pk.id)

if not current_case:
return False

# Check if case is restricted - only admins can join restricted cases
if current_case.visibility == Visibility.restricted:
return OrganizationAdminPermission(request=request)
return any_permission(
permissions=[OrganizationAdminPermission],
request=request,
)

# Check project's allow_self_join setting - only admins can override
if not current_case.project.allow_self_join:
return any_permission(
permissions=[OrganizationAdminPermission],
request=request,
)

return True

Expand Down
Loading