Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 110 additions & 42 deletions snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@


UNREASONABLY_LARGE_NUMBER_OF_BYTES_SCANNED_PER_QUERY = int(1e12)
_RATE_LIMITER = RedisSlidingWindowRateLimiter(
get_redis_client(RedisClientKey.RATE_LIMITER)
)
_RATE_LIMITER = RedisSlidingWindowRateLimiter(get_redis_client(RedisClientKey.RATE_LIMITER))
DEFAULT_OVERRIDE_LIMIT = -1
PETABYTE = 10**12
DEFAULT_BYTES_SCANNED_LIMIT = int(1.28 * PETABYTE)
Expand All @@ -64,32 +62,61 @@ class BytesScannedRejectingPolicy(AllocationPolicy):
WINDOW_GRANULARITY_SECONDS = 60

def _additional_config_definitions(self) -> list[Configuration]:
# Overrides are prioritized in order of specificity.
# If two overrides applicable available to the request, the one with a smaller value takes precedence
# Overrides are checked in order of specificity; the first one set wins.
# For organization_id queries:
# (organization_id, referrer) > organization_id > (all orgs, referrer) > default
return [
Configuration(
"referrer_all_projects_scan_limit_override",
f"Specific referrer scan limit in the last {self.WINDOW_SECONDS/ 60} mins, APPLIES TO ALL PROJECTS",
f"Specific referrer scan limit in the last {self.WINDOW_SECONDS / 60} mins, APPLIES TO ALL PROJECTS",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"referrer": str},
),
Configuration(
"referrer_all_organizations_scan_limit_override",
f"Specific referrer scan limit in the last {self.WINDOW_SECONDS/ 60} mins, APPLIES TO ALL ORGANIZATIONS",
f"Specific referrer scan limit in the last {self.WINDOW_SECONDS / 60} mins, APPLIES TO ALL ORGANIZATIONS",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"referrer": str},
),
Configuration(
"organization_referrer_scan_limit_override",
f"Specific (organization_id, referrer) scan limit in the last {self.WINDOW_SECONDS / 60} mins",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"organization_id": int, "referrer": str},
),
Configuration(
"organization_scan_limit_override",
f"Scan limit for a specific organization_id across any referrer in the last {self.WINDOW_SECONDS / 60} mins",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"organization_id": int},
),
Configuration(
"organization_referrer_max_bytes_to_read",
"Per-(organization_id, referrer) hard cap forwarded to clickhouse as max_bytes_to_read. Queries that match are allowed to run with this cap and bypass the sliding-window scan limit",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"organization_id": int, "referrer": str},
),
Configuration(
"organization_max_bytes_to_read",
"Per-organization_id hard cap forwarded to clickhouse as max_bytes_to_read across any referrer. Queries that match are allowed to run with this cap and bypass the sliding-window scan limit",
int,
DEFAULT_OVERRIDE_LIMIT,
param_types={"organization_id": int},
),
Configuration(
"project_referrer_scan_limit",
f"DEFAULT: how many bytes can a project scan per referrer in the last {self.WINDOW_SECONDS/ 60} mins before queries start getting rejected",
f"DEFAULT: how many bytes can a project scan per referrer in the last {self.WINDOW_SECONDS / 60} mins before queries start getting rejected",
int,
DEFAULT_BYTES_SCANNED_LIMIT,
),
Configuration(
"organization_referrer_scan_limit",
f"DEFAULT: how many bytes can an organization scan per referrer in the last {self.WINDOW_SECONDS/ 60} mins before queries start getting rejected. Cross-project queries are limited by organization_id",
f"DEFAULT: how many bytes can an organization scan per referrer in the last {self.WINDOW_SECONDS / 60} mins before queries start getting rejected. Cross-project queries are limited by organization_id",
int,
DEFAULT_BYTES_SCANNED_LIMIT * 2,
),
Expand Down Expand Up @@ -125,9 +152,7 @@ def _additional_config_definitions(self) -> list[Configuration]:
),
]

def _are_tenant_ids_valid(
self, tenant_ids: dict[str, str | int]
) -> tuple[bool, str]:
def _are_tenant_ids_valid(self, tenant_ids: dict[str, str | int]) -> tuple[bool, str]:
if self.is_cross_org_query(tenant_ids):
return True, "cross org query"
if tenant_ids.get("referrer") is None:
Expand Down Expand Up @@ -162,18 +187,55 @@ def __get_scan_limit(
return int(self.get_config_value("project_referrer_scan_limit"))
return int(override)
elif customer_tenant_key == "organization_id":
override = self.get_config_value(
org_referrer_override = self.get_config_value(
"organization_referrer_scan_limit_override",
{"organization_id": customer_tenant_value, "referrer": referrer},
)
if org_referrer_override != DEFAULT_OVERRIDE_LIMIT:
return int(org_referrer_override)
org_override = self.get_config_value(
"organization_scan_limit_override",
{"organization_id": customer_tenant_value},
)
if org_override != DEFAULT_OVERRIDE_LIMIT:
return int(org_override)
all_orgs_referrer_override = self.get_config_value(
"referrer_all_organizations_scan_limit_override", {"referrer": referrer}
)
if override == DEFAULT_OVERRIDE_LIMIT:
return int(self.get_config_value("organization_referrer_scan_limit"))
return int(override)
if all_orgs_referrer_override != DEFAULT_OVERRIDE_LIMIT:
return int(all_orgs_referrer_override)
return int(self.get_config_value("organization_referrer_scan_limit"))
raise InvalidTenantsForAllocationPolicy.from_args(
{customer_tenant_key: customer_tenant_value, "referrer": referrer},
self.__class__.__name__,
"customer tenant key is neither project_id or organization_id, this should never happen",
)

def __get_organization_max_bytes_to_read(
self, tenant_ids: dict[str, str | int], referrer: str | int
) -> int | None:
"""Return a per-org max_bytes_to_read cap if one is configured.

Precedence: (organization_id, referrer) > organization_id.
Returns None when no cap applies.
"""
org_id = tenant_ids.get("organization_id")
if org_id is None:
return None
org_referrer_cap = self.get_config_value(
"organization_referrer_max_bytes_to_read",
{"organization_id": org_id, "referrer": referrer},
)
if org_referrer_cap != DEFAULT_OVERRIDE_LIMIT:
return int(org_referrer_cap)
org_cap = self.get_config_value(
"organization_max_bytes_to_read",
{"organization_id": org_id},
)
if org_cap != DEFAULT_OVERRIDE_LIMIT:
return int(org_cap)
return None

def _get_quota_allowance(
self, tenant_ids: dict[str, str | int], query_id: str
) -> QuotaAllowance:
Expand Down Expand Up @@ -212,9 +274,24 @@ def _get_quota_allowance(
suggestion=PASS_THROUGH_REFERRERS_SUGGESTION,
)

scan_limit = self.__get_scan_limit(
customer_tenant_key, customer_tenant_value, referrer
)
org_cap = self.__get_organization_max_bytes_to_read(tenant_ids, referrer)
if org_cap is not None:
return QuotaAllowance(
can_run=True,
max_threads=self.max_threads,
max_bytes_to_read=org_cap,
explanation={
"reason": f"organization_id {tenant_ids.get('organization_id')} runs with a per-org max_bytes_to_read cap of {org_cap}"
},
is_throttled=False,
throttle_threshold=MAX_THRESHOLD,
rejection_threshold=MAX_THRESHOLD,
quota_used=0,
quota_unit=QUOTA_UNIT,
suggestion=NO_SUGGESTION,
)
Comment on lines +282 to +292
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The new organization-level max_bytes_to_read check runs before determining the query type, causing project-based queries to bypass their specific rate limits if an org-level cap is set.
Severity: HIGH

Suggested Fix

The organization cap check should be moved to after the call to _get_customer_tenant_key_and_value(). The check should only be applied when the customer_tenant_key is confirmed to be "organization_id", ensuring it only affects organization-based queries.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py#L277-L292

Potential issue: The `__get_organization_max_bytes_to_read()` check is applied to any
query with an `organization_id` in `tenant_ids`. This check occurs before the logic that
determines if a query is project-based or organization-based. Consequently, for a
project-based query that also includes an `organization_id`, if an organization-level
`max_bytes_to_read` cap is configured, the function will return early with the
organization's cap. This bypasses the intended project-level sliding-window rate limits,
potentially allowing the project to scan an unlimited number of bytes beyond its
configured limit.

Did we get this right? 👍 / 👎 to inform future reviews.


scan_limit = self.__get_scan_limit(customer_tenant_key, customer_tenant_value, referrer)
throttle_threshold = max(
1, int(scan_limit // self.get_config_value("bytes_throttle_divider"))
)
Expand Down Expand Up @@ -244,8 +321,7 @@ def _get_quota_allowance(
if granted_quota.granted <= 0:
if self.get_config_value("limit_bytes_instead_of_rejecting"):
max_bytes_to_read = int(
scan_limit
/ self.get_config_value("max_bytes_to_read_scan_limit_divider")
scan_limit / self.get_config_value("max_bytes_to_read_scan_limit_divider")
)
explanation[
"reason"
Expand All @@ -259,16 +335,13 @@ def _get_quota_allowance(

self.metrics.increment(
"bytes_scanned_limited",
tags={
"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"
},
tags={"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"},
)
return QuotaAllowance(
can_run=True,
max_threads=max(
1,
self.max_threads
// self.get_config_value("threads_throttle_divider"),
self.max_threads // self.get_config_value("threads_throttle_divider"),
),
max_bytes_to_read=max_bytes_to_read,
explanation=explanation,
Expand All @@ -293,9 +366,7 @@ def _get_quota_allowance(

self.metrics.increment(
"bytes_scanned_rejection",
tags={
"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"
},
tags={"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"},
)
return QuotaAllowance(
can_run=False,
Expand All @@ -319,8 +390,7 @@ def _get_quota_allowance(
can_run=True,
max_threads=max(
1,
self.max_threads
// self.get_config_value("threads_throttle_divider"),
self.max_threads // self.get_config_value("threads_throttle_divider"),
),
explanation={"reason": "within_limit but throttled"},
is_throttled=True,
Expand Down Expand Up @@ -349,17 +419,17 @@ def _get_bytes_scanned_in_query(
if result_or_error.error:
if (
isinstance(result_or_error.error.__cause__, ClickhouseError)
and result_or_error.error.__cause__.code
== errors.ErrorCodes.TIMEOUT_EXCEEDED
and result_or_error.error.__cause__.code == errors.ErrorCodes.TIMEOUT_EXCEEDED
):
return int(
self.get_config_value(
"clickhouse_timeout_bytes_scanned_penalization"
)
)
return int(self.get_config_value("clickhouse_timeout_bytes_scanned_penalization"))
else:
return 0
progress_bytes_scanned = cast(int, result_or_error.query_result.result.get("profile", {}).get("progress_bytes", None)) # type: ignore
progress_bytes_scanned = cast(
int,
result_or_error.query_result.result.get("profile", {}).get( # type: ignore[union-attr]
"progress_bytes", None
),
)
if isinstance(progress_bytes_scanned, (int, float)):
self.metrics.increment(
"progress_bytes_scanned",
Expand Down Expand Up @@ -388,9 +458,7 @@ def _update_quota_balance(
customer_tenant_key,
customer_tenant_value,
) = self._get_customer_tenant_key_and_value(tenant_ids)
scan_limit = self.__get_scan_limit(
customer_tenant_key, customer_tenant_value, referrer
)
scan_limit = self.__get_scan_limit(customer_tenant_key, customer_tenant_value, referrer)
# we can assume that the requested quota was granted (because it was)
# we just need to update the quota with however many bytes were consumed
_RATE_LIMITER.use_quotas(
Expand Down
Loading
Loading