Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions snuba/cli/replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
help="Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.",
)
@click.option("--log-level", help="Logging level to use.")
@click.option(
"--health-check-file",
default=None,
type=str,
help="Arroyo will touch this file at intervals to indicate health. If not provided, no health check is performed.",
)
@click.option(
"--max-poll-interval-ms",
type=int,
default=30000,
)
def replacer(
*,
replacements_topic: Optional[str],
Expand All @@ -68,6 +79,8 @@ def replacer(
queued_max_messages_kbytes: int,
queued_min_messages: int,
log_level: Optional[str] = None,
health_check_file: Optional[str] = None,
max_poll_interval_ms: int = 30000,
) -> None:
from arroyo import Topic, configure_metrics
from arroyo.backends.kafka import KafkaConsumer
Expand Down Expand Up @@ -97,6 +110,12 @@ def replacer(

configure_metrics(StreamMetricsAdapter(metrics))

consumer_config = {
"max.poll.interval.ms": max_poll_interval_ms,
}
if max_poll_interval_ms < 45000:
consumer_config["session.timeout.ms"] = max_poll_interval_ms
Comment on lines +113 to +117
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The default value for --max-poll-interval-ms unconditionally overrides the Kafka consumer's max.poll.interval.ms, reducing it from 5 minutes to 30 seconds for all deployments.
Severity: HIGH

Suggested Fix

Change the default value of the max_poll_interval_ms argument to None. Only apply the max.poll.interval.ms setting to the Kafka consumer configuration if the value is not None, following the existing pattern in snuba/consumers/consumer_builder.py.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: snuba/cli/replacer.py#L113-L117

Potential issue: The `replacer` CLI unconditionally applies a default
`max.poll.interval.ms` of 30000ms (30 seconds) to the Kafka consumer configuration. This
overrides `librdkafka`'s default of 300000ms (5 minutes) for all existing deployments
that do not specify the new `--max-poll-interval-ms` flag. Because `session.timeout.ms`
must be less than or equal to this value, it is also reduced. If a replacement operation
takes longer than 30 seconds, which is common for large replacements, the consumer will
be removed from its consumer group, leading to rebalancing loops in production.

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +116 to +117
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Setting session.timeout.ms equal to max_poll_interval_ms is risky, as it leaves no buffer for processing delays and can cause frequent, unnecessary consumer rebalances.
Severity: MEDIUM

Suggested Fix

Decouple session.timeout.ms from max_poll_interval_ms. Set session.timeout.ms to a static, reasonable value (e.g., the default 45000ms) that is independent of max_poll_interval_ms, while still ensuring the constraint session.timeout.ms <= max_poll_interval_ms is met. This provides a buffer for processing and prevents instability from transient delays.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: snuba/cli/replacer.py#L116-L117

Potential issue: The code sets the Kafka consumer's `session.timeout.ms` to be equal to
`max_poll_interval_ms` when the latter is below 45 seconds. This conflates two distinct
timeouts: session liveness and message processing time. By making them equal (e.g., at
the default of 30 seconds), there is no tolerance for transient delays like GC pauses,
network jitter, or I/O latency. This can cause the Kafka broker to prematurely consider
the consumer dead, triggering frequent and unnecessary group rebalances, which degrades
performance and stability. This pattern is explicitly marked as a "HACK" in another part
of the codebase.

Also affects:

  • snuba/consumers/consumer_builder.py:175-176


replacer = StreamProcessor(
KafkaConsumer(
build_kafka_consumer_configuration(
Expand All @@ -107,11 +126,13 @@ def replacer(
strict_offset_reset=not no_strict_offset_reset,
queued_max_messages_kbytes=queued_max_messages_kbytes,
queued_min_messages=queued_min_messages,
override_params=consumer_config,
),
),
Topic(replacements_topic),
ReplacerStrategyFactory(
worker=ReplacerWorker(storage, consumer_group, metrics=metrics),
health_check_file=health_check_file,
),
ONCE_PER_SECOND,
)
Expand Down
10 changes: 9 additions & 1 deletion snuba/replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.processing.strategies.healthcheck import Healthcheck
from arroyo.types import BrokerValue, Commit, Message, Partition

from snuba import settings
Expand Down Expand Up @@ -288,8 +289,10 @@ class ReplacerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
worker: ReplacerWorker,
health_check_file: Optional[str] = None,
) -> None:
self.__worker = worker
self.__health_check_file = health_check_file

def create_with_partitions(
self,
Expand All @@ -303,7 +306,12 @@ def processing_func(message: Message[KafkaPayload]) -> None:

commit_offsets: ProcessingStrategy[Any] = CommitOffsets(commit)

return RunTask(processing_func, commit_offsets)
strategy: ProcessingStrategy[KafkaPayload] = RunTask(processing_func, commit_offsets)

if self.__health_check_file is not None:
strategy = Healthcheck(self.__health_check_file, strategy)

return strategy


class ReplacerWorker:
Expand Down
67 changes: 67 additions & 0 deletions tests/cli/test_replacer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from unittest.mock import Mock, patch

from click.testing import CliRunner

import snuba.cli.replacer
import snuba.replacer
import snuba.utils.streams.configuration_builder
from snuba.cli.replacer import replacer


@patch("arroyo.processing.StreamProcessor")
@patch("arroyo.backends.kafka.KafkaConsumer")
@patch(
"snuba.utils.streams.configuration_builder.build_kafka_consumer_configuration",
return_value={"bootstrap.servers": "localhost"},
)
@patch.object(snuba.replacer, "ReplacerStrategyFactory")
@patch.object(snuba.replacer, "ReplacerWorker")
@patch.object(snuba.cli.replacer, "get_writable_storage")
@patch.object(snuba.cli.replacer, "setup_logging")
@patch.object(snuba.cli.replacer, "setup_sentry")
@patch("arroyo.configure_metrics")
@patch.object(snuba.cli.replacer, "signal")
def test_replacer_cli(
_signal: Mock,
_configure_metrics: Mock,
_setup_sentry: Mock,
_setup_logging: Mock,
get_writable_storage: Mock,
replacer_worker: Mock,
replacer_strategy_factory: Mock,
build_kafka_consumer_configuration: Mock,
_kafka_consumer: Mock,
_stream_processor: Mock,
) -> None:
storage = Mock()
topic_spec = Mock()
topic_spec.topic_name = "replacements-topic"
topic_spec.topic = Mock()
storage.get_table_writer.return_value.get_stream_loader.return_value.get_replacement_topic_spec.return_value = topic_spec
get_writable_storage.return_value = storage

worker = Mock()
replacer_worker.return_value = worker

runner = CliRunner()
result = runner.invoke(
replacer,
[
"--storage",
"errors",
"--health-check-file",
"/tmp/health.txt",
"--max-poll-interval-ms",
"12345",
],
)

assert result.exit_code == 0, result.output
assert replacer_strategy_factory.call_args.kwargs == {
"worker": worker,
"health_check_file": "/tmp/health.txt",
}
assert build_kafka_consumer_configuration.call_args.kwargs["override_params"] == {
"max.poll.interval.ms": 12345,
"session.timeout.ms": 12345,
}
12 changes: 12 additions & 0 deletions tests/test_replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
import simplejson as json
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies.healthcheck import Healthcheck
from arroyo.types import BrokerValue, Message, Partition, Topic

from snuba import replacer, settings
Expand Down Expand Up @@ -423,3 +424,14 @@ def test_query_time_flags_project_and_groups(self) -> None:
# exclude_groups from project setter, start_merge from group setter
{ReplacementType.EXCLUDE_GROUPS, ReplacementType.START_MERGE},
)

def test_replacer_strategy_factory_wraps_healthcheck(self) -> None:
worker = mock.Mock()
factory = replacer.ReplacerStrategyFactory(
worker=worker,
health_check_file="/tmp/health.txt",
)

strategy = factory.create_with_partitions(mock.Mock(), {})

assert isinstance(strategy, Healthcheck)
Loading