Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from typing import List, Sequence

from snuba.clickhouse.columns import Column, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String

num_attr_buckets = 40
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
sampling_weights = [8, 8**2, 8**3]
old_version = 4
new_version = old_version + 1

columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("item_type", UInt(8)),
Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column("trace_id", UUID()),
Column("item_id", UInt(128)),
Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column(
"retention_days",
UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"])),
),
Column(
"attributes_bool",
Map(
String(),
Bool(),
),
),
Column(
"attributes_int",
Map(
String(),
Int(64),
),
),
]


columns.extend(
[
Column(
f"attributes_string_{i}",
Map(
String(),
String(),
modifiers=Modifiers(
codecs=["ZSTD(1)"],
),
),
)
for i in range(num_attr_buckets)
]
)

columns.extend(
[
Column(
f"attributes_float_{i}",
Map(
String(),
Float(64),
modifiers=Modifiers(
codecs=["ZSTD(1)"],
),
),
)
for i in range(num_attr_buckets)
]
)


def should_include_column(name: str) -> bool:
return name not in {
"sampling_weight",
"sampling_factor",
"retention_days",
"client_sample_rate",
"server_sample_rate",
}


# Per-item sampling on `item_id`. Each tier picks independently so the tiers
# are not subsets of each other, and items belonging to the same trace can
# end up in different tiers.
def generate_old_materialized_view_expression(sampling_weight: int) -> str:
column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)])
return " ".join(
[
"SELECT",
f"{column_names_str},",
"downsampled_retention_days AS retention_days,",
f"sampling_weight * {sampling_weight} AS sampling_weight,",
f"sampling_factor / {sampling_weight} AS sampling_factor,",
f"client_sample_rate / {sampling_weight} AS client_sample_rate,",
f"server_sample_rate / {sampling_weight} AS server_sample_rate",
"FROM eap_items_1_local",
f"WHERE (cityHash64(item_id + {sampling_weight}) % {sampling_weight}) = 0",
]
)


# Trace-based sampling that keeps each tier a strict subset of the tier
# above. Hashing `trace_id` (and not perturbing the hash per tier) makes
# every item in a trace land in the same set of tiers, and because the
# sampling weights are 8 / 64 / 512 (each divides the next), an item that
# satisfies `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0`.
def generate_new_materialized_view_expression(sampling_weight: int) -> str:
column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)])
return " ".join(
[
"SELECT",
f"{column_names_str},",
"downsampled_retention_days AS retention_days,",
f"sampling_weight * {sampling_weight} AS sampling_weight,",
f"sampling_factor / {sampling_weight} AS sampling_factor,",
f"client_sample_rate / {sampling_weight} AS client_sample_rate,",
f"server_sample_rate / {sampling_weight} AS server_sample_rate",
"FROM eap_items_1_local",
f"WHERE (cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}) = 0",
]
)


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[SqlOperation]:
ops: List[SqlOperation] = []

for sampling_weight in sampling_weights:
local_table_name = f"eap_items_1_downsample_{sampling_weight}_local"

ops.extend(
[
operations.CreateMaterializedView(
storage_set=storage_set_key,
view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}",
columns=columns,
destination_table_name=local_table_name,
target=OperationTarget.LOCAL,
query=generate_new_materialized_view_expression(sampling_weight),
),
operations.DropTable(
storage_set=storage_set_key,
table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}",
target=OperationTarget.LOCAL,
),
]
)

return ops

def backwards_ops(self) -> Sequence[SqlOperation]:
ops: List[SqlOperation] = []

for sampling_weight in sampling_weights:
local_table_name = f"eap_items_1_downsample_{sampling_weight}_local"

ops.extend(
[
operations.CreateMaterializedView(
storage_set=storage_set_key,
view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}",
columns=columns,
destination_table_name=local_table_name,
target=OperationTarget.LOCAL,
query=generate_old_materialized_view_expression(sampling_weight),
),
operations.DropTable(
storage_set=storage_set_key,
table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}",
target=OperationTarget.LOCAL,
),
]
)

return ops
50 changes: 50 additions & 0 deletions tests/migrations/test_eap_subset_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Unit tests for the EAP downsample materialized view introduced in
migration 0055_sample_downsample_tiers_by_trace.

These check:
- the new MV samples on `trace_id` (so every item in a trace lands in the
same set of tiers), and
- because the sampling weights are 8 / 64 / 512 (each divides the next),
any hash value that satisfies `H % 512 == 0` also satisfies
`H % 64 == 0` and `H % 8 == 0` — i.e. tier 512 ⊆ tier 64 ⊆ tier 8.
"""

from importlib import import_module

_migration = import_module(
"snuba.snuba_migrations.events_analytics_platform.0055_sample_downsample_tiers_by_trace"
)


def test_new_mv_samples_on_trace_id() -> None:
for sampling_weight in _migration.sampling_weights:
sql = _migration.generate_new_materialized_view_expression(sampling_weight)
assert f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}" in sql, (
f"sampling weight {sampling_weight} should hash trace_id, got: {sql}"
)
# The previous per-item-id form must be gone — otherwise items in
# the same trace can land in different tiers.
assert "cityHash64(item_id" not in sql


def test_subset_property_via_modular_arithmetic() -> None:
# The MV uses `WHERE cityHash64(reinterpretAsUInt128(trace_id)) % w = 0`.
# If H % 512 == 0 then H % 64 == 0 and H % 8 == 0, since 8 | 64 | 512.
# Verify the divisibility chain that the SQL relies on for the subset
# guarantee.
weights = sorted(_migration.sampling_weights)
for smaller, larger in zip(weights, weights[1:]):
assert larger % smaller == 0, (
f"sampling_weight {larger} must be a multiple of {smaller} "
"for tier subset property to hold"
)

# Spot-check with concrete hash values. Any H that lands in the tightest
# tier (largest weight) must also land in every looser tier.
largest = max(_migration.sampling_weights)
for k in range(10):
h = k * largest # H % largest == 0 by construction
for w in _migration.sampling_weights:
assert h % w == 0, (
f"hash {h} passes tier {largest} but not tier {w}; subset property violated"
)
Loading