Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions src/sentry/dynamic_sampling/tasks/boost_low_volume_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,30 @@ def query_project_counts_by_org(
"id", flat=True
)
)
transaction_string_id = indexer.resolve_shared_org("decision")
transaction_tag = f"tags_raw[{transaction_string_id}]"
decision_string_id = indexer.resolve_shared_org("decision")
decision_tag = f"tags_raw[{decision_string_id}]"
if measure == SamplingMeasure.SPANS:
is_segment_string_id = indexer.resolve_shared_org("is_segment")
is_segment_tag = f"tags_raw[{is_segment_string_id}]"
metric_id = indexer.resolve_shared_org(str(SpanMRI.COUNT_PER_ROOT_PROJECT.value))
use_case_id = UseCaseID.SPANS
elif measure == SamplingMeasure.TRANSACTIONS:
is_segment_tag = None
metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
use_case_id = UseCaseID.TRANSACTIONS
else:
raise ValueError(f"Unsupported measure: {measure}")

where_conditions = [
Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, metric_id),
Condition(Column("org_id"), Op.IN, org_ids),
Condition(Column("project_id"), Op.IN, project_ids),
]
if is_segment_tag:
where_conditions.append(Condition(Column(is_segment_tag), Op.EQ, "true"))

query = Query(
match=Entity(EntityKey.GenericOrgMetricsCounters.value),
select=[
Expand All @@ -326,27 +341,21 @@ def query_project_counts_by_org(
"sumIf",
[
Column("value"),
Function("equals", [Column(transaction_tag), "keep"]),
Function("equals", [Column(decision_tag), "keep"]),
],
alias="keep_count",
),
Function(
"sumIf",
[
Column("value"),
Function("equals", [Column(transaction_tag), "drop"]),
Function("equals", [Column(decision_tag), "drop"]),
],
alias="drop_count",
),
],
groupby=[Column("org_id"), Column("project_id")],
where=[
Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, metric_id),
Condition(Column("org_id"), Op.IN, org_ids),
Condition(Column("project_id"), Op.IN, project_ids),
],
where=where_conditions,
granularity=granularity,
orderby=[
OrderBy(Column("org_id"), Direction.ASC),
Expand All @@ -371,7 +380,7 @@ def query_project_counts_by_org(
dataset=Dataset.PerformanceMetrics.value,
app_id="dynamic_sampling",
query=query.set_offset(offset),
tenant_ids={"use_case_id": UseCaseID.TRANSACTIONS.value, "cross_org_query": 1},
tenant_ids={"use_case_id": use_case_id.value, "cross_org_query": 1},
)
data = raw_snql_query(
request,
Expand Down
66 changes: 48 additions & 18 deletions src/sentry/dynamic_sampling/tasks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from sentry.sentry_metrics import indexer
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.metrics.naming_layer.mri import SpanMRI, TransactionMRI
from sentry.snuba.referrer import Referrer
from sentry.utils.snuba import raw_snql_query

Expand All @@ -50,10 +50,22 @@ def __init__(
max_projects: int | None = None,
time_interval: timedelta = ACTIVE_ORGS_DEFAULT_TIME_INTERVAL,
granularity: Granularity = ACTIVE_ORGS_DEFAULT_GRANULARITY,
use_span_metric: bool = False,
) -> None:
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)
self.use_span_metric = use_span_metric
self.is_segment_tag: str | None
if self.use_span_metric:
is_segment_string_id = indexer.resolve_shared_org("is_segment")
self.is_segment_tag = f"tags_raw[{is_segment_string_id}]"
self.metric_id = indexer.resolve_shared_org(str(SpanMRI.COUNT_PER_ROOT_PROJECT.value))
self.use_case_id = UseCaseID.SPANS
else:
self.is_segment_tag = None
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)
self.use_case_id = UseCaseID.TRANSACTIONS

self.offset = 0
self.last_result: list[tuple[int, int]] = []
self.has_more_results = True
Expand All @@ -72,6 +84,18 @@ def __next__(self) -> list[int]:

if self.has_more_results:
# not enough for the current iteration and data still in the db top it up from db
where_conditions = [
Condition(
Column("timestamp"),
Op.GTE,
datetime.utcnow() - self.time_interval,
),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, self.metric_id),
]
if self.use_span_metric and self.is_segment_tag:
where_conditions.append(Condition(Column(self.is_segment_tag), Op.EQ, "true"))

query = (
Query(
match=Entity(EntityKey.GenericOrgMetricsCounters.value),
Expand All @@ -82,15 +106,7 @@ def __next__(self) -> list[int]:
groupby=[
Column("org_id"),
],
where=[
Condition(
Column("timestamp"),
Op.GTE,
datetime.utcnow() - self.time_interval,
),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, self.metric_id),
],
where=where_conditions,
orderby=[
OrderBy(Column("org_id"), Direction.ASC),
],
Expand All @@ -104,7 +120,7 @@ def __next__(self) -> list[int]:
app_id="dynamic_sampling",
query=query,
tenant_ids={
"use_case_id": UseCaseID.TRANSACTIONS.value,
"use_case_id": self.use_case_id.value,
"cross_org_query": 1,
},
)
Expand Down Expand Up @@ -200,12 +216,23 @@ def __init__(
granularity: Granularity = ACTIVE_ORGS_VOLUMES_DEFAULT_GRANULARITY,
include_keep: bool = True,
orgs: list[int] | None = None,
use_span_metric: bool = False,
) -> None:
self.include_keep = include_keep
self.orgs = orgs
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)
self.use_span_metric = use_span_metric
self.is_segment_tag: str | None
if self.use_span_metric:
is_segment_string_id = indexer.resolve_shared_org("is_segment")
self.is_segment_tag = f"tags_raw[{is_segment_string_id}]"
self.metric_id = indexer.resolve_shared_org(str(SpanMRI.COUNT_PER_ROOT_PROJECT.value))
self.use_case_id = UseCaseID.SPANS
else:
self.is_segment_tag = None
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)
self.use_case_id = UseCaseID.TRANSACTIONS

if self.include_keep:
decision_string_id = indexer.resolve_shared_org("decision")
Expand Down Expand Up @@ -254,6 +281,9 @@ def __next__(self) -> list[OrganizationDataVolume]:
if self.orgs:
where.append(Condition(Column("org_id"), Op.IN, self.orgs))

if self.use_span_metric and self.is_segment_tag:
where.append(Condition(Column(self.is_segment_tag), Op.EQ, "true"))

if self.include_keep:
select.append(self.keep_count_column)

Expand All @@ -277,7 +307,7 @@ def __next__(self) -> list[OrganizationDataVolume]:
app_id="dynamic_sampling",
query=query,
tenant_ids={
"use_case_id": UseCaseID.TRANSACTIONS.value,
"use_case_id": self.use_case_id.value,
"cross_org_query": 1,
},
)
Expand Down
45 changes: 42 additions & 3 deletions src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import sentry_sdk

from sentry import quotas
from sentry import options, quotas
from sentry.constants import SAMPLING_MODE_DEFAULT, TARGET_SAMPLE_RATE_DEFAULT
from sentry.dynamic_sampling.rules.utils import DecisionKeepCount, OrganizationId, ProjectId
from sentry.dynamic_sampling.tasks.boost_low_volume_projects import (
fetch_projects_with_total_root_transaction_count_and_rates,
)
from sentry.dynamic_sampling.tasks.common import GetActiveOrgsVolumes
from sentry.dynamic_sampling.tasks.common import GetActiveOrgs, GetActiveOrgsVolumes
from sentry.dynamic_sampling.tasks.constants import MAX_REBALANCE_FACTOR, MIN_REBALANCE_FACTOR
from sentry.dynamic_sampling.tasks.helpers.recalibrate_orgs import (
compute_adjusted_factor,
Expand All @@ -35,6 +35,29 @@
from sentry.taskworker.retry import Retry


def _partition_orgs_by_span_metric_option(
org_ids: list[int],
) -> tuple[list[int], list[int]]:
"""
Partition organizations by whether they are in the span-metric-orgs option.
Returns (span_metric_orgs, transaction_metric_orgs).
"""
span_metric_org_ids = set(
options.get("dynamic-sampling.recalibrate_orgs.span-metric-orgs") or []
)

span_metric_orgs = []
transaction_metric_orgs = []

for org_id in org_ids:
if org_id in span_metric_org_ids:
span_metric_orgs.append(org_id)
else:
transaction_metric_orgs.append(org_id)

return span_metric_orgs, transaction_metric_orgs


@instrumented_task(
name="sentry.dynamic_sampling.tasks.recalibrate_orgs",
namespace=telemetry_experience_tasks,
Expand All @@ -44,7 +67,23 @@
)
@dynamic_sampling_task
def recalibrate_orgs() -> None:
for org_volumes in GetActiveOrgsVolumes():
# First get all active org IDs, then partition by feature flag
for orgs in GetActiveOrgs():
span_metric_orgs, transaction_metric_orgs = _partition_orgs_by_span_metric_option(orgs)

# Process span metric orgs
_process_orgs_for_recalibration(org_ids=span_metric_orgs, use_span_metric=True)

# Process transaction metric orgs
_process_orgs_for_recalibration(org_ids=transaction_metric_orgs, use_span_metric=False)


def _process_orgs_for_recalibration(org_ids: list[int], use_span_metric: bool) -> None:
"""Process organizations for recalibration using the appropriate metric."""
if not org_ids:
return

for org_volumes in GetActiveOrgsVolumes(orgs=org_ids, use_span_metric=use_span_metric):
modes = OrganizationOption.objects.get_value_bulk_id(
[v.org_id for v in org_volumes], "sentry:sampling_mode", SAMPLING_MODE_DEFAULT
)
Expand Down
79 changes: 66 additions & 13 deletions src/sentry/dynamic_sampling/tasks/sliding_window_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from datetime import timedelta

from sentry import options
from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds
from sentry.dynamic_sampling.tasks.common import (
GetActiveOrgs,
GetActiveOrgsVolumes,
compute_guarded_sliding_window_sample_rate,
)
Expand All @@ -20,6 +22,29 @@
from sentry.taskworker.retry import Retry


def _partition_orgs_by_span_metric_option(
org_ids: list[int],
) -> tuple[list[int], list[int]]:
"""
Partition organizations by whether they are in the span-metric-orgs option.
Returns (span_metric_orgs, transaction_metric_orgs).
"""
span_metric_org_ids = set(
options.get("dynamic-sampling.sliding_window_org.span-metric-orgs") or []
)

span_metric_orgs = []
transaction_metric_orgs = []

for org_id in org_ids:
if org_id in span_metric_org_ids:
span_metric_orgs.append(org_id)
else:
transaction_metric_orgs.append(org_id)

return span_metric_orgs, transaction_metric_orgs


@instrumented_task(
name="sentry.dynamic_sampling.tasks.sliding_window_org",
namespace=telemetry_experience_tasks,
Expand All @@ -32,25 +57,53 @@ def sliding_window_org() -> None:
window_size = get_sliding_window_size()
# In case the size is None it means that we disabled the sliding window entirely.
if window_size is not None:
orgs_volumes_iterator = GetActiveOrgsVolumes(
max_orgs=CHUNK_SIZE,
time_interval=timedelta(hours=window_size),
include_keep=False,
)

for orgs_volume in orgs_volumes_iterator:
for org_volume in orgs_volume:
adjust_base_sample_rate_of_org(
org_id=org_volume.org_id,
total_root_count=org_volume.total,
window_size=window_size,
)
# First get all active org IDs, then partition by feature flag
for orgs in GetActiveOrgs(max_orgs=CHUNK_SIZE):
span_metric_orgs, transaction_metric_orgs = _partition_orgs_by_span_metric_option(orgs)

# Process span metric orgs
_process_orgs_volumes(
org_ids=span_metric_orgs,
window_size=window_size,
use_span_metric=True,
)

# Process transaction metric orgs
_process_orgs_volumes(
org_ids=transaction_metric_orgs,
window_size=window_size,
use_span_metric=False,
)

# Due to the synchronous nature of the sliding window org, when we arrived here, we can confidently say
# that the execution of the sliding window org was successful. We will keep this state for 1 hour.
mark_sliding_window_org_executed()


def _process_orgs_volumes(
org_ids: list[int],
window_size: int,
use_span_metric: bool,
) -> None:
"""Process org volumes for a batch of organizations."""
if not org_ids:
return

for orgs_volume in GetActiveOrgsVolumes(
max_orgs=CHUNK_SIZE,
time_interval=timedelta(hours=window_size),
include_keep=False,
orgs=org_ids,
use_span_metric=use_span_metric,
):
for org_volume in orgs_volume:
adjust_base_sample_rate_of_org(
org_id=org_volume.org_id,
total_root_count=org_volume.total,
window_size=window_size,
)


def adjust_base_sample_rate_of_org(org_id: int, total_root_count: int, window_size: int) -> None:
"""
Adjusts the base sample rate per org by considering its volume and how it fits w.r.t. to the sampling tiers.
Expand Down
Loading
Loading