Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ fn create_factory(
ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned());
let replacements_concurrency =
ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned());
let late_arrivals_concurrency =
ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned());
let factory = ConsumerStrategyFactoryV2 {
storage_config: storage,
env_config: EnvConfig::default(),
Expand All @@ -84,6 +86,7 @@ fn create_factory(
clickhouse_concurrency,
commitlog_concurrency,
replacements_concurrency,
late_arrivals_concurrency,
async_inserts: false,
python_max_queue_depth: None,
use_rust_processor: true,
Expand All @@ -106,6 +109,7 @@ fn create_factory(
use_row_binary: false,
blq_producer_config: None,
blq_topic: None,
late_arrivals_config: None,
};
Box::new(factory)
}
Expand Down
6 changes: 6 additions & 0 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ pub struct ConsumerConfig {
pub replacements_topic: Option<TopicConfig>,
pub accepted_outcomes_topic: Option<TopicConfig>,
pub dlq_topic: Option<TopicConfig>,
/// Optional Kafka topic for messages diverted by an eap-items-style
/// partition-boundary killswitch. Distinct from `dlq_topic`: late
/// arrivals are known-good payloads safe for automated replay, while
/// the DLQ is reserved for genuinely invalid messages.
#[serde(default)]
pub late_arrivals_topic: Option<TopicConfig>,
pub accountant_topic: TopicConfig,
pub max_batch_size: usize,
pub max_batch_time_ms: u64,
Expand Down
31 changes: 31 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,17 @@ pub fn consumer_impl(
None
};

let late_arrivals_config = if let Some(topic_config) = consumer_config.late_arrivals_topic {
let producer_config =
KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config));
Some((
producer_config,
Topic::new(&topic_config.physical_topic_name),
))
} else {
None
};

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);

let mut rebalance_delay_secs = consumer_config
Expand All @@ -274,6 +285,7 @@ pub fn consumer_impl(
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
late_arrivals_concurrency: ConcurrencyConfig::new(10),
async_inserts,
python_max_queue_depth,
use_rust_processor,
Expand All @@ -291,6 +303,7 @@ pub fn consumer_impl(
use_row_binary,
blq_producer_config: blq_producer_config.clone(),
blq_topic: dlq_topic,
late_arrivals_config,
};

let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy);
Expand Down Expand Up @@ -381,5 +394,23 @@ pub fn process_message(
}
}
}
processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(f) => {
let res = f(payload, meta, &config::ProcessorConfig::default())
.map_err(|e| SnubaRustError::new_err(format!("invalid message: {e:?}")))?;

match res {
crate::types::InsertOrLateArrival::Insert(r) => {
let payload = PyBytes::new(py, &r.rows.into_encoded_rows()).into();
Ok((Some(payload), None))
}
crate::types::InsertOrLateArrival::LateArrival(_) => {
// This Python helper is for single-message decoding (e.g.,
// by the admin/replay tooling). A late-arrival outcome
// isn't insertable; surface it as an empty insert so the
// caller doesn't try to consume rows.
Ok((None, None))
}
}
}
}
}
90 changes: 83 additions & 7 deletions rust_snuba/src/factory_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep;
use crate::strategies::commit_log::ProduceCommitLog;
use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck;
use crate::strategies::join_timeout::SetJoinTimeout;
use crate::strategies::late_arrivals::ProduceLateArrivals;
use crate::strategies::processor::{
get_schema, make_rust_processor, make_rust_processor_row_binary,
make_rust_processor_with_replacements, validate_schema,
make_rust_processor_with_late_arrivals, make_rust_processor_with_replacements, validate_schema,
RowBinaryProcessorFn,
};
use crate::strategies::python::PythonTransformStep;
use crate::strategies::replacements::ProduceReplacements;
use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch};
use crate::types::{BytesInsertBatch, CogsData, InsertOrLateArrival, RowData};

pub struct ConsumerStrategyFactoryV2 {
pub storage_config: config::StorageConfig,
Expand All @@ -50,6 +52,7 @@ pub struct ConsumerStrategyFactoryV2 {
pub clickhouse_concurrency: ConcurrencyConfig,
pub commitlog_concurrency: ConcurrencyConfig,
pub replacements_concurrency: ConcurrencyConfig,
pub late_arrivals_concurrency: ConcurrencyConfig,
pub async_inserts: bool,
pub python_max_queue_depth: Option<usize>,
pub use_rust_processor: bool,
Expand All @@ -67,6 +70,11 @@ pub struct ConsumerStrategyFactoryV2 {
pub use_row_binary: bool,
pub blq_producer_config: Option<KafkaConfig>,
pub blq_topic: Option<Topic>,
/// Optional dedicated topic for messages diverted by the eap-items
/// partition-boundary killswitch. When set, late-arriving messages
/// are produced here instead of going through the regular DLQ. When
/// unset, the killswitch silently drops them.
pub late_arrivals_config: Option<(KafkaConfig, Topic)>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
Expand Down Expand Up @@ -236,12 +244,60 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
self.stop_at_timestamp,
)
}
(
true,
Some(processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(func)),
) => {
let late_arrivals_step: Box<
dyn ProcessingStrategy<InsertOrLateArrival<BytesInsertBatch<RowData>>>,
> = match self.late_arrivals_config.clone() {
Some((producer_config, destination)) => {
tracing::info!(
"Routing late arrivals to dedicated topic {:?}",
destination,
);
let producer = KafkaProducer::new(producer_config);
Box::new(ProduceLateArrivals::new(
next_step,
producer,
destination,
&self.late_arrivals_concurrency,
false,
))
}
None => {
tracing::info!(
"No late_arrivals_topic configured; late arrivals will be dropped",
);
Box::new(ProduceLateArrivals::disabled(next_step))
}
};

make_rust_processor_with_late_arrivals(
late_arrivals_step,
func,
&self.logical_topic_name,
self.enforce_schema,
&self.processing_concurrency,
config::ProcessorConfig {
env_config: self.env_config.clone(),
storage_name: self.storage_config.name.clone(),
},
self.stop_at_timestamp,
)
}
(
false,
Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)),
) => {
panic!("Consumer with replacements cannot be run in hybrid-mode");
}
(
false,
Some(processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(_)),
) => {
panic!("Consumer with late arrivals cannot be run in hybrid-mode");
}
_ => {
let schema = get_schema(&self.logical_topic_name, self.enforce_schema);

Expand Down Expand Up @@ -319,11 +375,7 @@ impl ConsumerStrategyFactoryV2 {
+ 'static,
>(
&self,
func: fn(
KafkaPayload,
crate::types::KafkaMessageMetadata,
&config::ProcessorConfig,
) -> anyhow::Result<TypedInsertBatch<T>>,
func: RowBinaryProcessorFn<T>,
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
// Commit offsets
let next_step = CommitOffsets::new(Duration::from_secs(1));
Expand Down Expand Up @@ -400,6 +452,30 @@ impl ConsumerStrategyFactoryV2 {
)
.flush_empty_batches(true);

let next_step: Box<dyn ProcessingStrategy<InsertOrLateArrival<BytesInsertBatch<Vec<T>>>>> =
match self.late_arrivals_config.clone() {
Some((producer_config, destination)) => {
tracing::info!(
"Routing late arrivals (row-binary) to dedicated topic {:?}",
destination,
);
let producer = KafkaProducer::new(producer_config);
Box::new(ProduceLateArrivals::new(
next_step,
producer,
destination,
&self.late_arrivals_concurrency,
false,
))
}
None => {
tracing::info!(
"No late_arrivals_topic configured (row-binary); late arrivals will be dropped",
);
Box::new(ProduceLateArrivals::disabled(next_step))
}
};

let next_step = make_rust_processor_row_binary(
next_step,
func,
Expand Down
Loading
Loading