Conversation
|
Claude finished @davidbuniat's task in 1m 0s —— View job PR Review: Cold Run OptimizationI've reviewed the changes and identified 5 critical issues that need to be addressed: Critical Issues:
See inline comments for details and suggested fixes. |
| * It fetches and caches the first batch so that subsequent next_batch() | ||
| * calls return immediately without blocking. | ||
| */ | ||
| void wait_for_first_batch(int64_t timeout_ms = 30000); |
There was a problem hiding this comment.
Missing implementation: wait_for_first_batch() is declared but not implemented in any .cpp file. This will cause linker errors when the code is built.
Fix: Implement this method in async_prefetcher.cpp or mark it as = delete if it's not meant to be used yet.
cpp/deeplake_pg/duckdb_executor.cpp
Outdated
| // Configure temp directory for disk spilling (if specified) | ||
| if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) { | ||
| auto temp_result = conns->con_cpp->Query( | ||
| fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory)); |
There was a problem hiding this comment.
SQL injection vulnerability: User-controlled duckdb_temp_directory is directly interpolated into SQL query without escaping. A malicious path like '; DROP TABLE users; -- could execute arbitrary SQL.
Fix: Use DuckDB's parameterized query API or properly escape the string:
| fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory)); | |
| auto temp_result = conns->con_cpp->Query( | |
| "SET temp_directory=?", pg::duckdb_temp_directory); |
| if (batch_index == 0 && !first_batch_cache_.empty() && | ||
| static_cast<size_t>(column_number) < first_batch_cache_.size() && | ||
| first_batch_cache_[column_number].has_value()) { | ||
| std::lock_guard lock(col_data.mutex_); |
There was a problem hiding this comment.
Race condition: Double-checked locking pattern is broken. Thread A could acquire lock, check !batch.initialized_, then thread B could populate cache between check and move. Thread A would then move from empty optional, causing undefined behavior.
Fix: Check has_value() again after acquiring the lock before moving.
cpp/deeplake_pg/table_data_impl.hpp
Outdated
| // Check first_batch_cache_ for batch 0 (from warm_all_streamers) | ||
| if (batch_index == 0 && !first_batch_cache_.empty() && | ||
| static_cast<size_t>(column_number) < first_batch_cache_.size() && | ||
| first_batch_cache_[column_number].has_value()) { | ||
| std::lock_guard lock(col_data.mutex_); | ||
| if (!batch.initialized_.load(std::memory_order_relaxed)) { | ||
| batch.owner_ = std::move(*first_batch_cache_[column_number]); | ||
| first_batch_cache_[column_number].reset(); | ||
| batch.initialized_.store(true, std::memory_order_release); | ||
| } | ||
| } |
There was a problem hiding this comment.
Memory ordering issue: Checking has_value() before lock but using std::memory_order_relaxed for initialized flag inside lock is inconsistent. If another thread sets initialized_ with release, this thread may not see the cache update due to lack of acquire semantics on the has_value check.
Fix: Move the entire cache check inside the lock, or use atomic operations for cache validity.
| } catch (const std::exception& e) { | ||
| base::log_warning(base::log_channel::async, "warm_all_streamers failed: {}", e.what()); | ||
| // Non-fatal - subsequent batch fetches will retry via normal path | ||
| first_batch_cache_.clear(); |
There was a problem hiding this comment.
Loss of failure information: When warm_all_streamers() fails and clears the cache, subsequent code has no way to know warming failed. This could lead to silently degraded performance or repeated failures.
Fix: Consider setting an error flag or logging at ERROR level instead of WARNING, so operators can detect cold-run optimization failures.
|




No description provided.