Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
# Used as a default when in SINGLE_ORGANIZATION mode.
SENTRY_ORGANIZATION: int | None = None

# Default organization ID for granting superuser privileges (typically organization 1 in SaaS mode)
SENTRY_DEFAULT_ORGANIZATION_ID = 1

# Project ID for recording frontend (javascript) exceptions
SENTRY_FRONTEND_PROJECT: int | None = None
# DSN for the frontend to use explicitly, which takes priority
Expand Down
34 changes: 34 additions & 0 deletions src/sentry/users/api/endpoints/user_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.api.serializers import serialize
from sentry.api.serializers.rest_framework import CamelSnakeModelSerializer
from sentry.auth.elevated_mode import has_elevated_mode
from sentry.conf.types.sentry_config import SentryMode
from sentry.constants import LANGUAGES
from sentry.core.endpoints.organization_details import post_org_pending_deletion
from sentry.models.organization import OrganizationStatus
Expand All @@ -44,6 +45,19 @@
TIMEZONE_CHOICES = get_timezone_choices()


def user_can_elevate(target_user: User) -> bool:
try:
org_member_exists = OrganizationMemberMapping.objects.filter(
organization_id=settings.SENTRY_DEFAULT_ORGANIZATION_ID,
user=target_user,
).exists()
except Exception:
# If anything goes wrong, default to not allowing elevation
return False

return org_member_exists


def record_user_deactivation(*, user: User, actor: Any, ip_address: str) -> None:
deactivation_datetime = django_timezone.now()
scheduled_deletion_datetime = deactivation_datetime + timedelta(days=30)
Expand Down Expand Up @@ -324,6 +338,26 @@ def put(self, request: Request, user: User) -> Response:
if not serializer.is_valid() or not serializer_options.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# We want to do extra checks in SaaS mode for superuser/staff elevation.
# The users have to also be a member of the default organization to be able to elevate
# to superuser/staff.
if settings.SENTRY_MODE == SentryMode.SAAS:
validated_data = serializer.validated_data
requested_superuser = validated_data.get("is_superuser")
requested_staff = validated_data.get("is_staff")

is_updating_superuser = requested_superuser is not None
is_updating_staff = requested_staff is not None

if is_updating_superuser or is_updating_staff:
if not user_can_elevate(user):
return Response(
{
"detail": "User must be a member to the default organization to enable SuperUser mode."
},
status=status.HTTP_403_FORBIDDEN,
)

# map API keys to keys in model
key_map = {
"theme": "theme",
Expand Down
124 changes: 122 additions & 2 deletions tests/sentry/users/api/endpoints/test_user_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.test import override_settings
from pytest import fixture

from sentry.conf.types.sentry_config import SentryMode
from sentry.deletions.tasks.hybrid_cloud import schedule_hybrid_cloud_foreign_key_jobs
from sentry.interfaces.stacktrace import StacktraceOrder
from sentry.models.deletedorganization import DeletedOrganization
Expand Down Expand Up @@ -300,11 +301,17 @@ def test_superuser_cannot_add_staff(self) -> None:
user = User.objects.get(id=self.user.id)
assert not user.is_staff

@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_superuser_with_permission_can_add_superuser(self) -> None:
self.user.update(is_superuser=False)
UserPermission.objects.create(user=self.superuser, permission="users.admin")
self.login_as(user=self.superuser, superuser=True)

# Create org 1 and add user as member
with assume_test_silo_mode(SiloMode.REGION):
org = self.create_organization(id=1, name="Default Org")
self.create_member(user=self.user, organization=org)

resp = self.get_success_response(
self.user.id,
isSuperuser="true",
Expand All @@ -314,7 +321,31 @@ def test_superuser_with_permission_can_add_superuser(self) -> None:
user = User.objects.get(id=self.user.id)
assert user.is_superuser

@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_superuser_with_permission_cannot_add_superuser_without_org_1_membership(self) -> None:
self.user.update(is_superuser=False)
UserPermission.objects.create(user=self.superuser, permission="users.admin")
self.login_as(user=self.superuser, superuser=True)

# User is not a member of org 1
resp = self.get_error_response(
self.user.id,
isSuperuser="true",
status_code=403,
)
assert (
resp.data["detail"]
== "User must be a member to the default organization to enable SuperUser mode."
)

user = User.objects.get(id=self.user.id)
assert not user.is_superuser

def test_superuser_with_permission_can_add_staff(self) -> None:
with assume_test_silo_mode(SiloMode.REGION):
org = self.create_organization(id=1, name="Default Org")
self.create_member(user=self.user, organization=org)

self.user.update(is_staff=False)
UserPermission.objects.create(user=self.superuser, permission="users.admin")
self.login_as(user=self.superuser, superuser=True)
Expand Down Expand Up @@ -449,6 +480,68 @@ def test_audit_log_emitted_when_multiple_privileged_fields_changed(
},
)

def test_superuser_can_update_non_privileged_fields_without_permission(self) -> None:
"""Test that superuser can update non-privileged fields even without users.admin permission"""
self.login_as(user=self.superuser, superuser=True)

# Create a verified email for the user before updating username
self.create_useremail(self.user, "newemail@example.com", is_verified=True)

resp = self.get_success_response(
self.user.id,
name="New Name",
username="newemail@example.com",
)
assert resp.data["id"] == str(self.user.id)
assert resp.data["name"] == "New Name"
assert resp.data["username"] == "newemail@example.com"

user = User.objects.get(id=self.user.id)
assert user.name == "New Name"
assert user.username == "newemail@example.com"

@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_superuser_with_permission_can_remove_superuser(self) -> None:
"""Test that superuser with permission can remove superuser status"""
# Add user to org 1 so they pass the user_can_elevate check
with assume_test_silo_mode(SiloMode.REGION):
org = self.create_organization(id=1, name="Default Org")
self.create_member(user=self.user, organization=org)

self.user.update(is_superuser=True)
UserPermission.objects.create(user=self.superuser, permission="users.admin")
self.login_as(user=self.superuser, superuser=True)

resp = self.get_success_response(
self.user.id,
isSuperuser="false",
)
assert resp.data["id"] == str(self.user.id)

user = User.objects.get(id=self.user.id)
assert not user.is_superuser

@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_superuser_with_permission_can_remove_staff(self) -> None:
"""Test that superuser with permission can remove staff status"""
# Add user to org 1 so they pass the user_can_elevate check
with assume_test_silo_mode(SiloMode.REGION):
org = self.create_organization(id=1, name="Default Org")
self.create_member(user=self.user, organization=org)

self.user.update(is_staff=True)
UserPermission.objects.create(user=self.superuser, permission="users.admin")
self.login_as(user=self.superuser, superuser=True)

resp = self.get_success_response(
self.user.id,
isStaff="false",
)
assert resp.data["id"] == str(self.user.id)

user = User.objects.get(id=self.user.id)
assert not user.is_staff


@control_silo_test
class UserDetailsStaffUpdateTest(UserDetailsTest):
Expand Down Expand Up @@ -538,12 +631,18 @@ def test_superuser_cannot_add_superuser_or_staff_with_feature_flag(self) -> None
assert not user.is_staff
assert not user.is_superuser

@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_staff_with_permission_can_add_superuser(self) -> None:
self.user.update(is_superuser=False)

UserPermission.objects.create(user=self.staff_user, permission="users.admin")
self.login_as(user=self.staff_user, staff=True)

# Create org 1 and add user as member
with assume_test_silo_mode(SiloMode.REGION):
org = self.create_organization(id=1, name="Default Org")
self.create_member(user=self.user, organization=org)

resp = self.get_success_response(
self.user.id,
isSuperuser="true",
Expand All @@ -553,9 +652,30 @@ def test_staff_with_permission_can_add_superuser(self) -> None:
user = User.objects.get(id=self.user.id)
assert user.is_superuser

def test_staff_with_permission_can_add_staff(self) -> None:
self.user.update(is_staff=False)
@override_settings(SENTRY_MODE=SentryMode.SAAS)
def test_staff_with_permission_cannot_add_superuser_without_default_organization_membership(
self,
) -> None:
self.user.update(is_superuser=False)

UserPermission.objects.create(user=self.staff_user, permission="users.admin")
self.login_as(user=self.staff_user, staff=True)

# User is not a member of org 1
resp = self.get_error_response(
self.user.id,
isSuperuser="true",
status_code=403,
)
assert (
resp.data["detail"]
== "User must be a member to the default organization to enable SuperUser mode."
)

user = User.objects.get(id=self.user.id)
assert not user.is_superuser

def test_staff_with_permission_can_add_staff(self) -> None:
UserPermission.objects.create(user=self.staff_user, permission="users.admin")
self.login_as(user=self.staff_user, staff=True)

Expand Down
Loading