Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 41 additions & 38 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,49 +191,52 @@ pub struct ConditionConfig {
#[serde(rename_all = "camelCase")]
pub struct Conditions {
pub operator: Option<LogicalOperator>,
#[serde(default)]
pub condition_config: Vec<ConditionConfig>,
/// Nested condition groups for complex logic like (A OR B) AND (C OR D)
pub groups: Option<Vec<Conditions>>,
}

impl Conditions {
fn format_condition(cond: &ConditionConfig) -> String {
match cond.value.as_ref().filter(|v| !v.is_empty()) {
Some(val) => format!("{} {} {}", cond.column, cond.operator, val),
None => format!("{} {}", cond.column, cond.operator),
}
}

pub fn generate_filter_message(&self) -> String {
match &self.operator {
Some(op) => match op {
LogicalOperator::And | LogicalOperator::Or => {
let expr1 = &self.condition_config[0];
let expr2 = &self.condition_config[1];
let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) {
format!(
"{} {} {}",
expr1.column,
expr1.operator,
expr1.value.as_ref().unwrap()
)
} else {
format!("{} {}", expr1.column, expr1.operator)
};

let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) {
format!(
"{} {} {}",
expr2.column,
expr2.operator,
expr2.value.as_ref().unwrap()
)
} else {
format!("{} {}", expr2.column, expr2.operator)
};

format!("[{expr1_msg} {op} {expr2_msg}]")
}
},
None => {
let expr = &self.condition_config[0];
if let Some(val) = &expr.value {
format!("{} {} {}", expr.column, expr.operator, val)
} else {
format!("{} {}", expr.column, expr.operator)
}
}
let op = self.operator.as_ref().unwrap_or(&LogicalOperator::And);
let separator = format!(" {op} ");

// Format inline condition_config entries
let condition_parts: Vec<String> = self
.condition_config
.iter()
.map(Self::format_condition)
.collect();

// Format nested groups recursively, skipping empty ones
let group_parts: Vec<String> = self
.groups
.as_deref()
.unwrap_or_default()
.iter()
.map(|g| g.generate_filter_message())
.filter(|msg| !msg.is_empty())
.map(|msg| format!("({msg})"))
.collect();

let all_parts: Vec<&str> = condition_parts
.iter()
.chain(group_parts.iter())
.map(|s| s.as_str())
.collect();

match all_parts.len() {
0 => String::default(),
1 => all_parts[0].to_string(),
_ => format!("[{}]", all_parts.join(&separator)),
}
}
}
Expand Down
46 changes: 32 additions & 14 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,21 +388,39 @@ fn extract_group_results(records: Vec<RecordBatch>, plan: LogicalPlan) -> AlertQ
}

pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
match &where_clause.operator {
Some(op) => match op {
&LogicalOperator::And => {
let mut exprs = vec![];
for condition in &where_clause.condition_config {
exprs.push(condition_to_expr(condition)?);
}
Ok(exprs.join(" AND "))
}
_ => Err(String::from("Invalid option 'or', only 'and' is supported")),
},
_ => Err(String::from(
"Invalid option 'null', only 'and' is supported",
)),
let op = where_clause
.operator
.as_ref()
.unwrap_or(&LogicalOperator::And);

let joiner = match op {
LogicalOperator::And => " AND ",
LogicalOperator::Or => " OR ",
};

let mut exprs = vec![];

// Process flat condition_config entries
for condition in &where_clause.condition_config {
exprs.push(condition_to_expr(condition)?);
}

// Process nested groups recursively
if let Some(groups) = &where_clause.groups {
for group in groups {
let group_expr = get_filter_string(group)?;
// Wrap each group in parentheses to preserve precedence
exprs.push(format!("({group_expr})"));
}
}

if exprs.is_empty() {
return Err(String::from(
"conditions must have at least one condition or group",
));
}

Ok(exprs.join(joiner))
}

fn condition_to_expr(condition: &ConditionConfig) -> Result<String, String> {
Expand Down
3 changes: 2 additions & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
None,
vec![],
vec![],
)
.await?;

Expand Down
16 changes: 13 additions & 3 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
Expand Down Expand Up @@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -206,6 +207,8 @@ pub async fn setup_otel_stream(
expected_log_source: LogSource,
known_fields: &[&str],
telemetry_type: TelemetryType,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -239,7 +242,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
dataset_tags,
dataset_labels,
)
.await?;
let mut time_partition = None;
Expand Down Expand Up @@ -362,6 +366,8 @@ pub async fn handle_otel_logs_ingestion(
LogSource::OtelLogs,
&OTEL_LOG_KNOWN_FIELD_LIST,
TelemetryType::Logs,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand All @@ -386,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion(
LogSource::OtelMetrics,
&OTEL_METRICS_KNOWN_FIELD_LIST,
TelemetryType::Metrics,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -417,6 +425,8 @@ pub async fn handle_otel_traces_ingestion(
LogSource::OtelTraces,
&OTEL_TRACES_KNOWN_FIELD_LIST,
TelemetryType::Traces,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down
33 changes: 18 additions & 15 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
*
*/

use actix_web::http::header::HeaderMap;

use crate::{
event::format::LogSource,
handlers::{
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
TelemetryType, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
use actix_web::http::header::HeaderMap;
use tracing::warn;

#[derive(Debug, Default)]
pub struct PutStreamHeaders {
Expand All @@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
pub dataset_tag: Option<DatasetTag>,
pub dataset_tags: Vec<DatasetTag>,
pub dataset_labels: Vec<String>,
}

impl From<&HeaderMap> for PutStreamHeaders {
Expand Down Expand Up @@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
dataset_tag: headers
.get(DATASET_TAG_KEY)
dataset_tags: headers
.get(DATASET_TAGS_KEY)
.or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
.and_then(|v| match DatasetTag::try_from(v) {
Ok(tag) => Some(tag),
Err(err) => {
warn!("Invalid dataset tag '{v}': {err}");
None
}
}),
.map(parse_dataset_tags)
.unwrap_or_default(),
dataset_labels: headers
.get(DATASET_LABELS_KEY)
.and_then(|v| v.to_str().ok())
.map(parse_dataset_labels)
.unwrap_or_default(),
}
}
}
3 changes: 2 additions & 1 deletion src/handlers/http/prism_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ pub async fn post_datasets(
req: HttpRequest,
) -> Result<impl Responder, PrismLogstreamError> {
let session_key = extract_session_key_from_req(&req)?;
let tenant_id = get_tenant_id_from_request(&req);
let dataset = dataset_req
.map(|Json(r)| r)
.unwrap_or_default()
.get_datasets(session_key)
.get_datasets(session_key, tenant_id)
.await?;

Ok(web::Json(dataset))
Expand Down
46 changes: 44 additions & 2 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*
*/

use std::collections::HashSet;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use tracing::warn;

pub mod airplane;
pub mod http;
Expand All @@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
pub const TENANT_ID: &str = "x-p-tenant";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand Down Expand Up @@ -85,12 +89,14 @@ impl Display for TelemetryType {
}

/// Tag for categorizing datasets/streams by observability domain
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum DatasetTag {
AgentObservability,
K8sObservability,
DatabaseObservability,
APM,
ServiceMap,
}

impl TryFrom<&str> for DatasetTag {
Expand All @@ -101,8 +107,10 @@ impl TryFrom<&str> for DatasetTag {
"agent-observability" => Ok(DatasetTag::AgentObservability),
"k8s-observability" => Ok(DatasetTag::K8sObservability),
"database-observability" => Ok(DatasetTag::DatabaseObservability),
"apm" => Ok(DatasetTag::APM),
"service-map" => Ok(DatasetTag::ServiceMap),
_ => Err(
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability, apm, service-map",
),
}
}
Expand All @@ -114,6 +122,40 @@ impl Display for DatasetTag {
DatasetTag::AgentObservability => "agent-observability",
DatasetTag::K8sObservability => "k8s-observability",
DatasetTag::DatabaseObservability => "database-observability",
DatasetTag::APM => "apm",
DatasetTag::ServiceMap => "service-map",
})
}
}

pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
header_value
.split(',')
.filter_map(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
None
} else {
match DatasetTag::try_from(trimmed) {
Ok(tag) => Some(tag),
Err(err) => {
warn!("Invalid dataset tag '{trimmed}': {err}");
None
}
}
}
})
.collect::<HashSet<_>>()
.into_iter()
.collect()
}

pub fn parse_dataset_labels(header_value: &str) -> Vec<String> {
header_value
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
Loading
Loading