Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use vortex_bench::generate_duckdb_registration_sql;
use vortex_duckdb::duckdb::Config;
use vortex_duckdb::duckdb::Connection;
use vortex_duckdb::duckdb::Database;
use vortex_duckdb::register_extension_options;

/// DuckDB context for benchmarks.
pub struct DuckClient {
Expand Down Expand Up @@ -67,10 +66,12 @@ impl DuckClient {
path: Option<PathBuf>,
threads: Option<usize>,
) -> Result<(Database, Connection)> {
let config = Config::new().vortex_expect("failed to create duckdb config");
let mut config = Config::new().vortex_expect("failed to create duckdb config");

// Register Vortex extension options before creating connection
register_extension_options(&config);
// Set DuckDB thread count if specified
if let Some(thread_count) = threads {
config.set("threads", &format!("{}", thread_count))?;
}

let db = match path {
Some(path) => Database::open_with_config(path, config),
Expand All @@ -91,11 +92,6 @@ impl DuckClient {
// parquet_metadata_cache" when running DuckDB in debug mode.
connection.query("SET parquet_metadata_cache = true")?;

// Set vortex_max_threads if specified
if let Some(thread_count) = threads {
connection.query(&format!("SET vortex_max_threads = {}", thread_count))?;
}

Ok((db, connection))
}

Expand Down
73 changes: 0 additions & 73 deletions vortex-duckdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use vortex::io::session::RuntimeSessionExt;
use vortex::session::VortexSession;

use crate::copy::VortexCopyFunction;
use crate::duckdb::Config;
pub use crate::duckdb::Connection;
pub use crate::duckdb::Database;
pub use crate::duckdb::LogicalType;
Expand Down Expand Up @@ -44,36 +43,6 @@ static RUNTIME: LazyLock<CurrentThreadRuntime> = LazyLock::new(CurrentThreadRunt
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle()));

/// Register Vortex extension configuration options with DuckDB.
/// This must be called before `register_table_functions` to take effect.
pub fn register_extension_options(config: &Config) {
let logical_type = LogicalType::uint64();

let default_threads = std::thread::available_parallelism()
.map(|n| n.get() as u64)
.unwrap_or(1);
let default_value = Value::from(default_threads);

// Register the vortex_max_threads extension option
// SAFETY: We're passing valid pointers for database, logical_type, and default_value
// The C++ code will copy the LogicalType and Value, so we can safely drop them after this call
let result = unsafe {
cpp::duckdb_vx_add_extension_option(
config.as_ptr(),
c"vortex_max_threads".as_ptr(),
c"Maximum number of threads for Vortex table scans".as_ptr(),
logical_type.as_ptr(),
default_value.as_ptr(),
)
};

assert_eq!(
result,
cpp::duckdb_state::DuckDBSuccess,
"Failed to register vortex_max_threads extension option"
);
}

/// Initialize the Vortex extension by registering the extension functions.
/// Note: This also registers extension options. If you want to register options
/// separately (e.g., before creating connections), call `register_extension_options` first.
Expand Down Expand Up @@ -124,45 +93,3 @@ pub extern "C" fn vortex_extension_version_rust() -> *const c_char {
}
.as_ptr()
}

#[cfg(test)]
mod tests {
use std::ffi::CString;

use super::*;
use crate::duckdb::Config;
use crate::duckdb::Database;

#[test]
fn test_vortex_max_threads_option_registration() {
let config = Config::new().expect("Failed to create config");
register_extension_options(&config);
let db = Database::open_in_memory_with_config(config).expect("Failed to open database");

let conn = db.connect().expect("Failed to connect");

let _result1 = conn
.query("SET vortex_max_threads = 4")
.expect("Failed to set vortex_max_threads - option may not be registered");

let max_threads_cstr = CString::new("vortex_max_threads").unwrap();
let ctx = conn.client_context().vortex_expect("ctx exists");
assert_eq!(
ctx.try_get_current_setting(&max_threads_cstr)
.unwrap()
.to_string(),
"4"
);

let _result2 = conn
.query("SET vortex_max_threads = 8")
.expect("Failed to set vortex_max_threads to 8");

assert_eq!(
ctx.try_get_current_setting(&max_threads_cstr)
.unwrap()
.to_string(),
"8"
);
}
}
31 changes: 4 additions & 27 deletions vortex-duckdb/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::cmp::max;
use std::ffi::CString;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
Expand Down Expand Up @@ -62,7 +61,6 @@ use crate::duckdb::BindResult;
use crate::duckdb::Cardinality;
use crate::duckdb::ClientContext;
use crate::duckdb::DataChunk;
use crate::duckdb::ExtractedValue;
use crate::duckdb::LogicalType;
use crate::duckdb::TableFunction;
use crate::duckdb::TableInitInput;
Expand All @@ -79,7 +77,6 @@ pub struct VortexBindData {
file_urls: Vec<Url>,
column_names: Vec<String>,
column_types: Vec<LogicalType>,
max_threads: u64,
}

impl Clone for VortexBindData {
Expand All @@ -92,7 +89,6 @@ impl Clone for VortexBindData {
file_urls: self.file_urls.clone(),
column_names: self.column_names.clone(),
column_types: self.column_types.clone(),
max_threads: self.max_threads,
}
}
}
Expand Down Expand Up @@ -260,24 +256,6 @@ impl TableFunction for VortexTableFunction {
.get_parameter(0)
.ok_or_else(|| vortex_err!("Missing file glob parameter"))?;

// Read the vortex_max_threads setting from DuckDB configuration
let max_threads_cstr = CString::new("vortex_max_threads")
.map_err(|e| vortex_err!("Invalid setting name: {}", e))?;
let max_threads = ctx
.try_get_current_setting(&max_threads_cstr)
.and_then(|v| match v.as_ref().extract() {
ExtractedValue::UBigInt(val) => usize::try_from(val).ok(),
ExtractedValue::BigInt(val) if val > 0 => usize::try_from(val as u64).ok(),
_ => None,
})
.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
});

tracing::trace!("running scan with max_threads {max_threads}");

let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob(
file_glob_string.as_ref().as_string(),
)))?;
Expand Down Expand Up @@ -309,7 +287,6 @@ impl TableFunction for VortexTableFunction {
filter_exprs: vec![],
column_names,
column_types,
max_threads: max_threads as u64,
})
}

Expand Down Expand Up @@ -389,13 +366,13 @@ impl TableFunction for VortexTableFunction {
.map_or_else(|| "true".to_string(), |f| f.to_string())
);

// Use the max_threads from bind_data (read from vortex_max_threads setting)
#[expect(clippy::cast_possible_truncation, reason = "max_threads fits in usize")]
let num_workers = bind_data.max_threads as usize;

let client_context = init_input.client_context()?;
let object_cache = client_context.object_cache();

let num_workers = std::thread::available_parallelism()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the extension, max_threads means up to how many threads are suppported by the extension, not how many should be used. That's why extensions use u64::MAX to signal that any thread count is supported.

So should DuckDB decide to use more threads than logical cores, they would mismatch.

.map(|n| n.get())
.unwrap_or(1);

let handle = RUNTIME.handle();
let first_file = bind_data.first_file.clone();
let scan_streams = stream::iter(bind_data.file_urls.clone())
Expand Down
Loading