Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 8 additions & 71 deletions engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::DatabaseError,
key_selector::KeySelector,
options::{ConflictRangeType, MutationType},
tx_ops::Operation,
tx_ops::{Operation, range_begin_contains, range_end_contains},
value::{KeyValue, Slice, Values},
versionstamp::{
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
Expand Down Expand Up @@ -438,17 +438,8 @@ impl TransactionTask {
let txn = self.create_transaction();
let read_opts = ReadOptions::default();

// Resolve the begin selector
let resolved_begin =
self.resolve_key_selector_for_range(&txn, &begin, begin_or_equal, begin_offset)?;

// Resolve the end selector
let resolved_end =
self.resolve_key_selector_for_range(&txn, &end, end_or_equal, end_offset)?;

// Now execute the range query with resolved keys
let iter = txn.iterator_opt(
rocksdb::IteratorMode::From(&resolved_begin, rocksdb::Direction::Forward),
rocksdb::IteratorMode::From(&begin, rocksdb::Direction::Forward),
read_opts,
);

Expand All @@ -457,8 +448,12 @@ impl TransactionTask {

for item in iter {
let (k, v) = item.context("failed to iterate rocksdb for get range")?;
// Check if we've reached the end key
if k.as_ref() >= resolved_end.as_slice() {

if !range_begin_contains(k.as_ref(), &begin, begin_or_equal, begin_offset) {
continue;
}

if !range_end_contains(k.as_ref(), &end, end_or_equal, end_offset) {
break;
}

Expand All @@ -477,64 +472,6 @@ impl TransactionTask {
Ok(Values::new(results))
}

fn resolve_key_selector_for_range(
&self,
txn: &RocksDbTransaction<OptimisticTransactionDB>,
key: &[u8],
or_equal: bool,
offset: i32,
) -> Result<Vec<u8>> {
// Based on PostgreSQL's interpretation:
// (false, 1) => first_greater_or_equal
// (true, 1) => first_greater_than
// (false, 0) => last_less_than
// (true, 0) => last_less_or_equal

let read_opts = ReadOptions::default();

match (or_equal, offset) {
(false, 1) => {
// first_greater_or_equal: find first key >= search_key
let iter = txn.iterator_opt(
rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
read_opts,
);
for item in iter {
let (k, _v) = item.context(
"failed to iterate rocksdb for range selector first_greater_or_equal",
)?;
return Ok(k.to_vec());
}
// If no key found, return a key that will make the range empty
Ok(vec![0xff; 255])
}
(true, 1) => {
// first_greater_than: find first key > search_key
let iter = txn.iterator_opt(
rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
read_opts,
);
for item in iter {
let (k, _v) = item.context(
"failed to iterate rocksdb for range selector first_greater_than",
)?;
// Skip if it's the exact key
if k.as_ref() == key {
continue;
}
return Ok(k.to_vec());
}
// If no key found, return a key that will make the range empty
Ok(vec![0xff; 255])
}
_ => {
// For other cases, just use the key as-is for now
// This is a simplification - full implementation would handle all cases
Ok(key.to_vec())
}
}
}

async fn handle_get_estimated_range_size(&mut self, begin: &[u8], end: &[u8]) -> Result<i64> {
let range = rocksdb::Range::new(begin, end);

Expand Down
36 changes: 31 additions & 5 deletions engine/packages/universaldb/src/tx_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ impl TransactionOperations {
return Ok(db_values);
}

let begin = opt.begin.key();
let end = opt.end.key();

// Start with database results in a map
let mut result_map = BTreeMap::new();
for kv in db_values.into_iter() {
Expand All @@ -357,7 +354,7 @@ impl TransactionOperations {
for op in &*self.operations() {
match op {
Operation::Set { key, value } => {
if key.as_slice() >= begin && key.as_slice() < end {
if range_contains(key.as_slice(), opt) {
result_map.insert(key.clone(), value.clone());
}
}
Expand All @@ -382,7 +379,7 @@ impl TransactionOperations {
param,
op_type,
} => {
if key.as_slice() >= begin && key.as_slice() < end {
if range_contains(key.as_slice(), opt) {
// Get current value for this key (from result_map or empty if not exists)
let current_value = result_map.get(key);
let current_slice = current_value.map(|v| &**v);
Expand Down Expand Up @@ -423,3 +420,32 @@ impl TransactionOperations {
.push((begin.to_vec(), end.to_vec(), conflict_type));
}
}

fn range_contains(key: &[u8], opt: &RangeOption<'_>) -> bool {
range_begin_contains(
key,
opt.begin.key(),
opt.begin.or_equal(),
opt.begin.offset(),
) && range_end_contains(key, opt.end.key(), opt.end.or_equal(), opt.end.offset())
}

pub(crate) fn range_begin_contains(key: &[u8], begin: &[u8], or_equal: bool, offset: i32) -> bool {
match (or_equal, offset) {
(false, 1) => key >= begin,
(true, 1) => key > begin,
(false, 0) => key > begin,
(true, 0) => key >= begin,
_ => key >= begin,
}
}

pub(crate) fn range_end_contains(key: &[u8], end: &[u8], or_equal: bool, offset: i32) -> bool {
match (or_equal, offset) {
(false, 1) => key < end,
(true, 1) => key <= end,
(false, 0) => key < end,
(true, 0) => key <= end,
_ => key < end,
}
}
35 changes: 35 additions & 0 deletions engine/packages/universaldb/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,41 @@ async fn test_range_options(db: &Database) {
assert_eq!(results[2].key(), test_subspace.pack(&("range_e",)));
assert_eq!(results[2].value(), b"val_e");

// Test 5: local writes outside the range should not be merged into the result
let results = db
.run(|tx| async move {
let test_subspace = Subspace::from("test");
let key_b = test_subspace.pack(&("range_b",));
let key_d = test_subspace.pack(&("range_d",));
let key_z = test_subspace.pack(&("range_z",));

tx.set(&key_z, b"val_z");

let range = RangeOption {
begin: KeySelector::first_greater_or_equal(Cow::Owned(key_b)),
end: KeySelector::first_greater_or_equal(Cow::Owned(key_d)),
limit: None,
reverse: false,
mode: StreamingMode::WantAll,
target_bytes: 0,
..RangeOption::default()
};

let vals = tx.get_range(&range, 1, Serializable).await?;
Ok(vals.into_vec())
})
.await
.unwrap();

assert_eq!(
results.len(),
2,
"Expected local key outside [b, d) to be excluded"
);
assert!(!results
.iter()
.any(|r| r.key() == test_subspace.pack(&("range_z",))));

// Clear test data
db.run(|tx| async move {
let test_subspace = Subspace::from("test");
Expand Down
Loading