Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/cubesqlplanner/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/cubesql/target
/.idea
upstream
.air
.cargo/config.toml
dist
node_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,258 @@ impl PreAggregationsCompiler {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_fixtures::cube_bridge::MockSchema;
use crate::test_fixtures::test_utils::TestContext;

#[test]
fn test_compile_simple_rollup() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitors".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name =
PreAggregationFullName::new("visitors".to_string(), "daily_rollup".to_string());
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();

assert_eq!(compiled.name, "daily_rollup");
assert_eq!(compiled.cube_name, "visitors");
assert_eq!(compiled.granularity, Some("day".to_string()));

// Check measures
assert_eq!(compiled.measures.len(), 2);
let measure_names: Vec<String> = compiled.measures.iter().map(|m| m.full_name()).collect();
assert!(measure_names.contains(&"visitors.count".to_string()));
assert!(measure_names.contains(&"visitors.unique_source_count".to_string()));

// Check dimensions
assert_eq!(compiled.dimensions.len(), 1);
assert_eq!(compiled.dimensions[0].full_name(), "visitors.source");

// Check time dimensions (with granularity suffix)
assert_eq!(compiled.time_dimensions.len(), 1);
assert_eq!(
compiled.time_dimensions[0].full_name(),
"visitors.created_at_day"
);
}

#[test]
fn test_compile_joined_rollup() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitor_checkins".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name = PreAggregationFullName::new(
"visitor_checkins".to_string(),
"joined_rollup".to_string(),
);
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();

assert_eq!(compiled.name, "joined_rollup");
assert_eq!(compiled.cube_name, "visitor_checkins");
assert_eq!(compiled.granularity, Some("day".to_string()));

// Check measures
assert_eq!(compiled.measures.len(), 1);
assert_eq!(compiled.measures[0].full_name(), "visitor_checkins.count");

// Check dimensions
assert_eq!(compiled.dimensions.len(), 2);
let dimension_names: Vec<String> =
compiled.dimensions.iter().map(|d| d.full_name()).collect();
assert!(dimension_names.contains(&"visitor_checkins.visitor_id".to_string()));
assert!(dimension_names.contains(&"visitors.source".to_string()));

// Check time dimensions (with granularity suffix)
assert_eq!(compiled.time_dimensions.len(), 1);
assert_eq!(
compiled.time_dimensions[0].full_name(),
"visitor_checkins.created_at_day"
);
}

#[test]
fn test_compile_multiplied_rollup() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitors".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name =
PreAggregationFullName::new("visitors".to_string(), "multiplied_rollup".to_string());
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();

assert_eq!(compiled.name, "multiplied_rollup");
assert_eq!(compiled.cube_name, "visitors");
assert_eq!(compiled.granularity, Some("day".to_string()));

// Check measures
assert_eq!(compiled.measures.len(), 1);
assert_eq!(compiled.measures[0].full_name(), "visitors.count");

// Check dimensions
assert_eq!(compiled.dimensions.len(), 2);
let dimension_names: Vec<String> =
compiled.dimensions.iter().map(|d| d.full_name()).collect();
assert!(dimension_names.contains(&"visitors.source".to_string()));
assert!(dimension_names.contains(&"visitor_checkins.source".to_string()));

// Check time dimensions (with granularity suffix)
assert_eq!(compiled.time_dimensions.len(), 1);
assert_eq!(
compiled.time_dimensions[0].full_name(),
"visitors.created_at_day"
);
}

#[test]
fn test_compile_all_pre_aggregations() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitors".to_string(), "visitor_checkins".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let compiled = compiler.compile_all_pre_aggregations(false).unwrap();

// Should compile all 8 pre-aggregations from visitors and visitor_checkins cubes
// assert_eq!(compiled.len(), 8);

let names: Vec<String> = compiled.iter().map(|pa| pa.name.clone()).collect();

// visitors pre-aggregations
assert!(names.contains(&"daily_rollup".to_string()));
assert!(names.contains(&"multiplied_rollup".to_string()));
assert!(names.contains(&"for_join".to_string()));

// visitor_checkins pre-aggregations
assert!(names.contains(&"joined_rollup".to_string()));
assert!(names.contains(&"checkins_with_visitor_source".to_string()));
assert!(names.contains(&"for_lambda".to_string()));
assert!(names.contains(&"lambda_union".to_string()));
}

#[test]
fn test_compile_nonexistent_pre_aggregation() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitors".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name =
PreAggregationFullName::new("visitors".to_string(), "nonexistent".to_string());
let result = compiler.compile_pre_aggregation(&pre_agg_name);

assert!(result.is_err());
}

#[test]
fn test_compile_rollup_join() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

// Need both cubes for rollupJoin: visitor_checkins and visitors
let cube_names = vec!["visitor_checkins".to_string(), "visitors".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name = PreAggregationFullName::new(
"visitor_checkins".to_string(),
"checkins_with_visitor_source".to_string(),
);
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();

assert_eq!(compiled.name, "checkins_with_visitor_source");
assert_eq!(compiled.cube_name, "visitor_checkins");

// Check measures
assert_eq!(compiled.measures.len(), 1);
assert_eq!(compiled.measures[0].full_name(), "visitor_checkins.count");

// Check dimensions
assert_eq!(compiled.dimensions.len(), 2);
let dimension_names: Vec<String> =
compiled.dimensions.iter().map(|d| d.full_name()).collect();
assert!(dimension_names.contains(&"visitor_checkins.visitor_id".to_string()));
assert!(dimension_names.contains(&"visitors.source".to_string()));

// Check source is Join
match compiled.source.as_ref() {
PreAggregationSource::Join(_) => {} // Expected
_ => panic!("Expected PreAggregationSource::Join"),
}
}

#[test]
fn test_compile_rollup_lambda() {
let schema = MockSchema::from_yaml_file("common/pre_aggregations_test.yaml");
let test_context = TestContext::new(schema).unwrap();
let query_tools = test_context.query_tools().clone();

let cube_names = vec!["visitor_checkins".to_string()];
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();

let pre_agg_name =
PreAggregationFullName::new("visitor_checkins".to_string(), "lambda_union".to_string());
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();

assert_eq!(compiled.name, "lambda_union");
assert_eq!(compiled.cube_name, "visitor_checkins");
assert_eq!(compiled.granularity, Some("day".to_string()));

// Check measures
assert_eq!(compiled.measures.len(), 1);
assert_eq!(compiled.measures[0].full_name(), "visitor_checkins.count");

// Check dimensions
assert_eq!(compiled.dimensions.len(), 1);
assert_eq!(
compiled.dimensions[0].full_name(),
"visitor_checkins.visitor_id"
);

// Check time dimensions
assert_eq!(compiled.time_dimensions.len(), 1);
assert_eq!(
compiled.time_dimensions[0].full_name(),
"visitor_checkins.created_at_day"
);

// Check source is Union
match compiled.source.as_ref() {
PreAggregationSource::Union(_) => {} // Expected
_ => panic!("Expected PreAggregationSource::Union"),
}
}

#[test]
fn test_pre_aggregation_full_name_from_string() {
let name = PreAggregationFullName::from_string("visitors.daily_rollup").unwrap();
assert_eq!(name.cube_name, "visitors");
assert_eq!(name.name, "daily_rollup");
}

#[test]
fn test_pre_aggregation_full_name_invalid() {
let result = PreAggregationFullName::from_string("invalid_name");
assert!(result.is_err());

let result2 = PreAggregationFullName::from_string("too.many.parts");
assert!(result2.is_err());
}
}
72 changes: 7 additions & 65 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use super::planners::QueryPlanner;
use super::query_tools::QueryTools;
use super::top_level_planner::TopLevelPlanner;
use super::QueryProperties;
use crate::cube_bridge::base_query_options::BaseQueryOptions;
use crate::cube_bridge::pre_aggregation_obj::NativePreAggregationObj;
use crate::logical_plan::OriginalSqlCollector;
//use crate::logical_plan::optimizers::*;
use crate::logical_plan::PreAggregation;
use crate::logical_plan::PreAggregationOptimizer;
use crate::logical_plan::Query;
use crate::physical_plan_builder::PhysicalPlanBuilder;
use cubenativeutils::wrappers::inner_types::InnerTypes;
use cubenativeutils::wrappers::object::NativeArray;
use cubenativeutils::wrappers::serializer::NativeSerialize;
use cubenativeutils::wrappers::NativeType;
use cubenativeutils::wrappers::{NativeContextHolder, NativeObjectHandle};
use cubenativeutils::CubeError;
use std::collections::HashMap;
use std::rc::Rc;

pub struct BaseQuery<IT: InnerTypes> {
Expand Down Expand Up @@ -54,15 +47,13 @@ impl<IT: InnerTypes> BaseQuery<IT> {
}

pub fn build_sql_and_params(&self) -> Result<NativeObjectHandle<IT>, CubeError> {
self.build_sql_and_params_impl()
}

fn build_sql_and_params_impl(&self) -> Result<NativeObjectHandle<IT>, CubeError> {
let query_planner = QueryPlanner::new(self.request.clone(), self.query_tools.clone());
let logical_plan = query_planner.plan()?;
let planner = TopLevelPlanner::new(
self.request.clone(),
self.query_tools.clone(),
self.cubestore_support_multistage,
);

let (optimized_plan, used_pre_aggregations) =
self.try_pre_aggregations(logical_plan.clone())?;
let (sql, used_pre_aggregations) = planner.plan()?;

let is_external = if !used_pre_aggregations.is_empty() {
used_pre_aggregations
Expand All @@ -73,22 +64,6 @@ impl<IT: InnerTypes> BaseQuery<IT> {
};

let templates = self.query_tools.plan_sql_templates(is_external)?;

let physical_plan_builder =
PhysicalPlanBuilder::new(self.query_tools.clone(), templates.clone());
let original_sql_pre_aggregations = if !self.request.is_pre_aggregation_query() {
OriginalSqlCollector::new(self.query_tools.clone()).collect(&optimized_plan)?
} else {
HashMap::new()
};

let physical_plan = physical_plan_builder.build(
optimized_plan,
original_sql_pre_aggregations,
self.request.is_total_query(),
)?;

let sql = physical_plan.to_sql(&templates)?;
let (result_sql, params) = self
.query_tools
.build_sql_and_params(&sql, true, &templates)?;
Expand All @@ -97,7 +72,6 @@ impl<IT: InnerTypes> BaseQuery<IT> {
res.set(0, result_sql.to_native(self.context.clone())?)?;
res.set(1, params.to_native(self.context.clone())?)?;
if let Some(used_pre_aggregation) = used_pre_aggregations.first() {
//FIXME We should build this object in Rust
let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name(
used_pre_aggregation.cube_name().clone(),
used_pre_aggregation.name().clone(),
Expand All @@ -115,36 +89,4 @@ impl<IT: InnerTypes> BaseQuery<IT> {

Ok(result)
}

fn try_pre_aggregations(
&self,
plan: Rc<Query>,
) -> Result<(Rc<Query>, Vec<Rc<PreAggregation>>), CubeError> {
let result = if !self.request.is_pre_aggregation_query() {
let mut pre_aggregation_optimizer = PreAggregationOptimizer::new(
self.query_tools.clone(),
self.cubestore_support_multistage,
);
let disable_external_pre_aggregations =
self.request.disable_external_pre_aggregations();
if let Some(result) = pre_aggregation_optimizer
.try_optimize(plan.clone(), disable_external_pre_aggregations)?
{
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
(
result,
pre_aggregation_optimizer.get_used_pre_aggregations(),
)
} else {
//TODO multiple pre-aggregations sources required changes in BaseQuery
(plan.clone(), Vec::new())
}
} else {
(plan.clone(), Vec::new())
}
} else {
(plan.clone(), Vec::new())
};
Ok(result)
}
}
2 changes: 2 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod query_properties;
pub mod query_tools;
pub mod sql_evaluator;
pub mod sql_templates;
pub mod top_level_planner;
pub mod utils;
pub mod visitor_context;

Expand All @@ -19,6 +20,7 @@ pub use base_query::BaseQuery;
pub use params_allocator::ParamsAllocator;
pub use query_properties::{FullKeyAggregateMeasures, OrderByItem, QueryProperties};
pub use time_dimension::*;
pub use top_level_planner::TopLevelPlanner;
pub use visitor_context::{
evaluate_sql_call_with_context, evaluate_with_context, FiltersContext, VisitorContext,
};
Loading
Loading