Skip to content

Commit 3aa6003

Browse files
joostjagerclaude
andcommitted
Add non-atomic ordered batch write method to KVStore traits
Adds `write_batch` method to both `KVStoreSync` and `KVStore` traits with default implementations that delegate to the single `write` method. - `BatchWriteEntry`: struct containing namespace, key, and data - `BatchWriteResult`: returns successful write count and optional error - Writes execute sequentially in order; stops on first error - Existing implementations automatically inherit the default behavior Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent f43803d commit 3aa6003

1 file changed

Lines changed: 128 additions & 0 deletions

File tree

lightning/src/util/persist.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,54 @@ pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper";
121121
/// updates applied to be current) with another implementation.
122122
pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
123123

124+
/// An entry in a batch write operation.
125+
pub struct BatchWriteEntry {
126+
/// The primary namespace for this write.
127+
pub primary_namespace: String,
128+
/// The secondary namespace for this write.
129+
pub secondary_namespace: String,
130+
/// The key to write to.
131+
pub key: String,
132+
/// The data to write.
133+
pub buf: Vec<u8>,
134+
}
135+
136+
impl BatchWriteEntry {
137+
/// Creates a new batch write entry.
138+
pub fn new(
139+
primary_namespace: impl Into<String>, secondary_namespace: impl Into<String>,
140+
key: impl Into<String>, buf: Vec<u8>,
141+
) -> Self {
142+
Self {
143+
primary_namespace: primary_namespace.into(),
144+
secondary_namespace: secondary_namespace.into(),
145+
key: key.into(),
146+
buf,
147+
}
148+
}
149+
}
150+
151+
/// The result of a batch write operation.
152+
#[derive(Debug)]
153+
pub struct BatchWriteResult {
154+
/// The number of writes that completed successfully.
155+
pub successful_writes: usize,
156+
/// The error that occurred, if any. If `None`, all writes succeeded.
157+
pub error: Option<io::Error>,
158+
}
159+
160+
impl BatchWriteResult {
161+
/// Returns `true` if all writes succeeded.
162+
pub fn is_ok(&self) -> bool {
163+
self.error.is_none()
164+
}
165+
166+
/// Returns the error if one occurred, consuming the result.
167+
pub fn err(self) -> Option<io::Error> {
168+
self.error
169+
}
170+
}
171+
124172
/// Provides an interface that allows storage and retrieval of persisted values that are associated
125173
/// with given keys.
126174
///
@@ -193,6 +241,31 @@ pub trait KVStoreSync {
193241
fn list(
194242
&self, primary_namespace: &str, secondary_namespace: &str,
195243
) -> Result<Vec<String>, io::Error>;
244+
/// Persists multiple key-value pairs in a single batch operation.
245+
///
246+
/// Processes writes in order. Non-atomic: if an error occurs, earlier writes may have already
247+
/// been persisted and will not be rolled back. However, writes after the failed one are never
248+
/// started.
249+
///
250+
/// The default implementation iterates through entries and calls [`Self::write`] for each one.
251+
/// Implementations may override for optimized batch operations.
252+
fn write_batch(&self, entries: Vec<BatchWriteEntry>) -> BatchWriteResult {
253+
let mut successful_writes = 0;
254+
for entry in entries {
255+
match self.write(
256+
&entry.primary_namespace,
257+
&entry.secondary_namespace,
258+
&entry.key,
259+
entry.buf,
260+
) {
261+
Ok(()) => successful_writes += 1,
262+
Err(e) => {
263+
return BatchWriteResult { successful_writes, error: Some(e) };
264+
},
265+
}
266+
}
267+
BatchWriteResult { successful_writes, error: None }
268+
}
196269
}
197270

198271
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type
@@ -238,6 +311,13 @@ where
238311

239312
async move { res }
240313
}
314+
315+
fn write_batch(
316+
&self, entries: Vec<BatchWriteEntry>,
317+
) -> impl Future<Output = BatchWriteResult> + 'static + MaybeSend {
318+
let res = self.0.write_batch(entries);
319+
async move { res }
320+
}
241321
}
242322

243323
/// Provides an interface that allows storage and retrieval of persisted values that are associated
@@ -335,6 +415,48 @@ pub trait KVStore {
335415
fn list(
336416
&self, primary_namespace: &str, secondary_namespace: &str,
337417
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + MaybeSend;
418+
/// Persists multiple key-value pairs in a single batch operation.
419+
///
420+
/// Processes writes in order, awaiting each write before starting the next. Non-atomic: if an
421+
/// error occurs, earlier writes may have already been persisted and will not be rolled back.
422+
/// However, writes after the failed one are never started.
423+
///
424+
/// Note that similar to [`Self::write`], ordering is maintained: all writes in the batch are
425+
/// ordered relative to each other and to concurrent writes.
426+
///
427+
/// The default implementation calls [`Self::write`] for each entry sequentially.
428+
fn write_batch(
429+
&self, entries: Vec<BatchWriteEntry>,
430+
) -> impl Future<Output = BatchWriteResult> + 'static + MaybeSend {
431+
// Capture all write futures synchronously to maintain ordering
432+
// (version numbers are assigned when write() is called, not when awaited)
433+
let write_futures: Vec<Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>>> =
434+
entries
435+
.into_iter()
436+
.map(|entry| {
437+
let fut = self.write(
438+
&entry.primary_namespace,
439+
&entry.secondary_namespace,
440+
&entry.key,
441+
entry.buf,
442+
);
443+
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>>
444+
})
445+
.collect();
446+
447+
async move {
448+
let mut successful_writes = 0;
449+
for write_future in write_futures {
450+
match write_future.await {
451+
Ok(()) => successful_writes += 1,
452+
Err(e) => {
453+
return BatchWriteResult { successful_writes, error: Some(e) };
454+
},
455+
}
456+
}
457+
BatchWriteResult { successful_writes, error: None }
458+
}
459+
}
338460
}
339461

340462
impl<K> KVStore for K
@@ -365,6 +487,12 @@ where
365487
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + MaybeSend {
366488
self.deref().list(primary_namespace, secondary_namespace)
367489
}
490+
491+
fn write_batch(
492+
&self, entries: Vec<BatchWriteEntry>,
493+
) -> impl Future<Output = BatchWriteResult> + 'static + MaybeSend {
494+
self.deref().write_batch(entries)
495+
}
368496
}
369497

370498
/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`]

0 commit comments

Comments
 (0)