-
-
Notifications
You must be signed in to change notification settings - Fork 62
feat(replacer): expose --max-poll-interval-ms and --health-check-file into the CLI #7961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,17 @@ | |
| help="Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.", | ||
| ) | ||
| @click.option("--log-level", help="Logging level to use.") | ||
| @click.option( | ||
| "--health-check-file", | ||
| default=None, | ||
| type=str, | ||
| help="Arroyo will touch this file at intervals to indicate health. If not provided, no health check is performed.", | ||
| ) | ||
| @click.option( | ||
| "--max-poll-interval-ms", | ||
| type=int, | ||
| default=30000, | ||
| ) | ||
| def replacer( | ||
| *, | ||
| replacements_topic: Optional[str], | ||
|
|
@@ -68,6 +79,8 @@ def replacer( | |
| queued_max_messages_kbytes: int, | ||
| queued_min_messages: int, | ||
| log_level: Optional[str] = None, | ||
| health_check_file: Optional[str] = None, | ||
| max_poll_interval_ms: int = 30000, | ||
| ) -> None: | ||
| from arroyo import Topic, configure_metrics | ||
| from arroyo.backends.kafka import KafkaConsumer | ||
|
|
@@ -97,6 +110,12 @@ def replacer( | |
|
|
||
| configure_metrics(StreamMetricsAdapter(metrics)) | ||
|
|
||
| consumer_config = { | ||
| "max.poll.interval.ms": max_poll_interval_ms, | ||
| } | ||
| if max_poll_interval_ms < 45000: | ||
| consumer_config["session.timeout.ms"] = max_poll_interval_ms | ||
|
Comment on lines
+116
to
+117
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Setting Suggested FixDecouple Prompt for AI AgentAlso affects:
|
||
|
|
||
| replacer = StreamProcessor( | ||
| KafkaConsumer( | ||
| build_kafka_consumer_configuration( | ||
|
|
@@ -107,11 +126,13 @@ def replacer( | |
| strict_offset_reset=not no_strict_offset_reset, | ||
| queued_max_messages_kbytes=queued_max_messages_kbytes, | ||
| queued_min_messages=queued_min_messages, | ||
| override_params=consumer_config, | ||
| ), | ||
| ), | ||
| Topic(replacements_topic), | ||
| ReplacerStrategyFactory( | ||
| worker=ReplacerWorker(storage, consumer_group, metrics=metrics), | ||
| health_check_file=health_check_file, | ||
| ), | ||
| ONCE_PER_SECOND, | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| from click.testing import CliRunner | ||
|
|
||
| import snuba.cli.replacer | ||
| import snuba.replacer | ||
| import snuba.utils.streams.configuration_builder | ||
| from snuba.cli.replacer import replacer | ||
|
|
||
|
|
||
| @patch("arroyo.processing.StreamProcessor") | ||
| @patch("arroyo.backends.kafka.KafkaConsumer") | ||
| @patch( | ||
| "snuba.utils.streams.configuration_builder.build_kafka_consumer_configuration", | ||
| return_value={"bootstrap.servers": "localhost"}, | ||
| ) | ||
| @patch.object(snuba.replacer, "ReplacerStrategyFactory") | ||
| @patch.object(snuba.replacer, "ReplacerWorker") | ||
| @patch.object(snuba.cli.replacer, "get_writable_storage") | ||
| @patch.object(snuba.cli.replacer, "setup_logging") | ||
| @patch.object(snuba.cli.replacer, "setup_sentry") | ||
| @patch("arroyo.configure_metrics") | ||
| @patch.object(snuba.cli.replacer, "signal") | ||
| def test_replacer_cli( | ||
| _signal: Mock, | ||
| _configure_metrics: Mock, | ||
| _setup_sentry: Mock, | ||
| _setup_logging: Mock, | ||
| get_writable_storage: Mock, | ||
| replacer_worker: Mock, | ||
| replacer_strategy_factory: Mock, | ||
| build_kafka_consumer_configuration: Mock, | ||
| _kafka_consumer: Mock, | ||
| _stream_processor: Mock, | ||
| ) -> None: | ||
| storage = Mock() | ||
| topic_spec = Mock() | ||
| topic_spec.topic_name = "replacements-topic" | ||
| topic_spec.topic = Mock() | ||
| storage.get_table_writer.return_value.get_stream_loader.return_value.get_replacement_topic_spec.return_value = topic_spec | ||
| get_writable_storage.return_value = storage | ||
|
|
||
| worker = Mock() | ||
| replacer_worker.return_value = worker | ||
|
|
||
| runner = CliRunner() | ||
| result = runner.invoke( | ||
| replacer, | ||
| [ | ||
| "--storage", | ||
| "errors", | ||
| "--health-check-file", | ||
| "/tmp/health.txt", | ||
| "--max-poll-interval-ms", | ||
| "12345", | ||
| ], | ||
| ) | ||
|
|
||
| assert result.exit_code == 0, result.output | ||
| assert replacer_strategy_factory.call_args.kwargs == { | ||
| "worker": worker, | ||
| "health_check_file": "/tmp/health.txt", | ||
| } | ||
| assert build_kafka_consumer_configuration.call_args.kwargs["override_params"] == { | ||
| "max.poll.interval.ms": 12345, | ||
| "session.timeout.ms": 12345, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The default value for
--max-poll-interval-msunconditionally overrides the Kafka consumer'smax.poll.interval.ms, reducing it from 5 minutes to 30 seconds for all deployments.Severity: HIGH
Suggested Fix
Change the default value of the
max_poll_interval_msargument toNone. Only apply themax.poll.interval.mssetting to the Kafka consumer configuration if the value is notNone, following the existing pattern insnuba/consumers/consumer_builder.py.Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.