Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions lib/sentry/telemetry/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Sentry.Telemetry.Scheduler do

alias Sentry.Telemetry.{Buffer, Category}
alias Sentry.{CheckIn, ClientReport, Config, Envelope, Event, LogEvent, Transaction, Transport}
alias Sentry.Transport.RateLimiter

@default_capacity 1000

Expand Down Expand Up @@ -222,17 +223,23 @@ defmodule Sentry.Telemetry.Scheduler do
state
else
category = Enum.at(state.priority_cycle, state.cycle_position)
buffer = Map.fetch!(state.buffers, category)

case Buffer.poll_if_ready(buffer) do
{:ok, items} when items != [] ->
state = send_items(state, category, items)
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
if category_rate_limited?(state, category) do
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
else
buffer = Map.fetch!(state.buffers, category)

case Buffer.poll_if_ready(buffer) do
{:ok, items} when items != [] ->
state = send_items(state, category, items)
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)

_ ->
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
_ ->
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
end
end
end
end
Expand Down Expand Up @@ -496,6 +503,14 @@ defmodule Sentry.Telemetry.Scheduler do
end
end

# Skip rate limit checks when on_envelope callback is set (unit test mode)
defp category_rate_limited?(%{on_envelope: cb}, _category) when is_function(cb, 1), do: false

defp category_rate_limited?(_state, category) do
data_category = Category.data_category(category)
RateLimiter.rate_limited?(data_category)
end

defp default_weights do
%{
critical: Category.weight(:critical),
Expand Down
85 changes: 61 additions & 24 deletions lib/sentry/telemetry_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ defmodule Sentry.TelemetryProcessor do

use Supervisor

alias Sentry.ClientReport
alias Sentry.Telemetry.{Buffer, Category, Scheduler}
alias Sentry.Transport.RateLimiter
alias Sentry.{CheckIn, Event, LogEvent, Transaction}

@default_name __MODULE__
Expand Down Expand Up @@ -122,58 +124,82 @@ defmodule Sentry.TelemetryProcessor do
@spec add(Supervisor.supervisor(), Event.t() | CheckIn.t() | Transaction.t() | LogEvent.t()) ::
:ok
def add(processor, %Event{} = item) when is_atom(processor) do
Buffer.add(buffer_name(processor, :error), item)
Scheduler.signal(scheduler_name(processor))
unless rate_limited?(:error) do
Buffer.add(buffer_name(processor, :error), item)
Scheduler.signal(scheduler_name(processor))
end

:ok
end

def add(processor, %Event{} = item) do
buffer = get_buffer(processor, :error)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
unless rate_limited?(:error) do
buffer = get_buffer(processor, :error)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
end

:ok
end

def add(processor, %CheckIn{} = item) when is_atom(processor) do
Buffer.add(buffer_name(processor, :check_in), item)
Scheduler.signal(scheduler_name(processor))
unless rate_limited?(:check_in) do
Buffer.add(buffer_name(processor, :check_in), item)
Scheduler.signal(scheduler_name(processor))
end

:ok
end

def add(processor, %CheckIn{} = item) do
buffer = get_buffer(processor, :check_in)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
unless rate_limited?(:check_in) do
buffer = get_buffer(processor, :check_in)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
end

:ok
end

def add(processor, %Transaction{} = item) when is_atom(processor) do
Buffer.add(buffer_name(processor, :transaction), item)
Scheduler.signal(scheduler_name(processor))
unless rate_limited?(:transaction) do
Buffer.add(buffer_name(processor, :transaction), item)
Scheduler.signal(scheduler_name(processor))
end

:ok
end

def add(processor, %Transaction{} = item) do
buffer = get_buffer(processor, :transaction)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
unless rate_limited?(:transaction) do
buffer = get_buffer(processor, :transaction)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
end

:ok
end

def add(processor, %LogEvent{} = item) when is_atom(processor) do
Buffer.add(buffer_name(processor, :log), item)
Scheduler.signal(scheduler_name(processor))
unless rate_limited?(:log) do
Buffer.add(buffer_name(processor, :log), item)
Scheduler.signal(scheduler_name(processor))
end

:ok
end

def add(processor, %LogEvent{} = item) do
buffer = get_buffer(processor, :log)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
unless rate_limited?(:log) do
buffer = get_buffer(processor, :log)
Buffer.add(buffer, item)
scheduler = get_scheduler(processor)
Scheduler.signal(scheduler)
end

:ok
end

Expand Down Expand Up @@ -257,6 +283,17 @@ defmodule Sentry.TelemetryProcessor do
end
end

defp rate_limited?(category) do
data_category = Category.data_category(category)

if RateLimiter.rate_limited?(data_category) do
ClientReport.Sender.record_discarded_events(:ratelimit_backoff, data_category)
true
else
false
end
end

defp safe_get_buffer(processor, category) when is_atom(processor) do
try do
{:ok,
Expand Down
132 changes: 131 additions & 1 deletion test/sentry/telemetry_processor_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ defmodule Sentry.TelemetryProcessorIntegrationTest do
TelemetryProcessor.add(ctx.processor, make_log_event("log-3"))

log_buffer = TelemetryProcessor.get_buffer(ctx.processor, :log)

_ = Buffer.size(log_buffer)
_ = :sys.get_state(Sentry.ClientReport.Sender)

Expand All @@ -324,6 +323,137 @@ defmodule Sentry.TelemetryProcessorIntegrationTest do
end
end

describe "scheduler rate limit checks" do
setup ctx do
Bypass.stub(ctx.bypass, "POST", "/api/1/envelope/", fn conn ->
Plug.Conn.resp(conn, 200, ~s<{"id": "340"}>)
end)

flush_ref_messages(ctx.ref)

on_exit(fn ->
try do
:ets.delete(Sentry.Transport.RateLimiter, "log_item")
:ets.delete(Sentry.Transport.RateLimiter, "error")
catch
:error, :badarg -> :ok
end
end)

:ok
end

test "keeps rate-limited log events in the buffer", ctx do
scheduler = TelemetryProcessor.get_scheduler(ctx.processor)
:sys.suspend(scheduler)

TelemetryProcessor.add(ctx.processor, make_log_event("rate-limited-log"))

log_buffer = TelemetryProcessor.get_buffer(ctx.processor, :log)
assert Buffer.size(log_buffer) == 1

:ets.insert(Sentry.Transport.RateLimiter, {"log_item", System.system_time(:second) + 60})

:sys.resume(scheduler)
_ = :sys.get_state(scheduler)

assert Buffer.size(log_buffer) == 1
end

test "keeps rate-limited error events in the buffer", ctx do
put_test_config(telemetry_processor_categories: [:error, :log])

scheduler = TelemetryProcessor.get_scheduler(ctx.processor)
:sys.suspend(scheduler)

Sentry.capture_message("rate-limited-error", result: :none)

error_buffer = TelemetryProcessor.get_buffer(ctx.processor, :error)
assert Buffer.size(error_buffer) == 1

:ets.insert(Sentry.Transport.RateLimiter, {"error", System.system_time(:second) + 60})

:sys.resume(scheduler)
_ = :sys.get_state(scheduler)

assert Buffer.size(error_buffer) == 1
end
end

describe "pre-buffer rate limit checks" do
setup ctx do
send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

_ = :sys.get_state(Sentry.ClientReport.Sender)
flush_ref_messages(ctx.ref)

rate_limiter_table = Process.get(:rate_limiter_table_name)

on_exit(fn ->
try do
:ets.delete(rate_limiter_table, "log_item")
:ets.delete(rate_limiter_table, "error")
catch
_, _ -> :ok
end
end)

%{rate_limiter_table: rate_limiter_table}
end

test "drops rate-limited log events before they enter the buffer", ctx do
log_buffer = TelemetryProcessor.get_buffer(ctx.processor, :log)

:ets.insert(ctx.rate_limiter_table, {"log_item", System.system_time(:second) + 60})

TelemetryProcessor.add(ctx.processor, make_log_event("pre-buffer-drop"))

assert Buffer.size(log_buffer) == 0

send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

ref = ctx.ref
assert_receive {^ref, body}, 2000

items = decode_envelope!(body)
assert [{%{"type" => "client_report"}, client_report}] = items

ratelimit_event =
Enum.find(client_report["discarded_events"], &(&1["reason"] == "ratelimit_backoff"))

assert ratelimit_event != nil
assert ratelimit_event["category"] == "log_item"
assert ratelimit_event["quantity"] == 1
end

test "drops rate-limited error events before they enter the buffer", ctx do
put_test_config(telemetry_processor_categories: [:error, :log])

error_buffer = TelemetryProcessor.get_buffer(ctx.processor, :error)

:ets.insert(ctx.rate_limiter_table, {"error", System.system_time(:second) + 60})

Sentry.capture_message("pre-buffer-drop", result: :none)

assert Buffer.size(error_buffer) == 0

send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

ref = ctx.ref
assert_receive {^ref, body}, 2000

items = decode_envelope!(body)
assert [{%{"type" => "client_report"}, client_report}] = items

ratelimit_event =
Enum.find(client_report["discarded_events"], &(&1["reason"] == "ratelimit_backoff"))

assert ratelimit_event != nil
assert ratelimit_event["category"] == "error"
assert ratelimit_event["quantity"] == 1
end
end

defp make_transaction do
now = System.system_time(:microsecond)

Expand Down