Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions appenders/async/src/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ impl Append for Async {
let task = Task::Flush { done: done_tx };
self.state.send_task(task)?;

match done_rx.recv() {
Ok(None) => Ok(()),
Ok(Some(err)) => Err(err),
Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)),
}
done_rx
.recv()
.map_err(|err| Error::new("worker exited before completing flush").with_source(err))
}
}

Expand Down
3 changes: 1 addition & 2 deletions appenders/async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]

use logforth_core::Error;
use logforth_core::kv;
use logforth_core::record::RecordOwned;

Expand All @@ -35,7 +34,7 @@ enum Task {
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
},
Flush {
done: oneshot::Sender<Option<Error>>,
done: oneshot::Sender<()>,
},
}

Expand Down
9 changes: 2 additions & 7 deletions appenders/async/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,12 @@ impl Worker {
});
}
Task::Flush { done } => {
let mut error = None;
for append in appends.iter() {
if let Err(err) = append.flush() {
error = Some(
error
.unwrap_or_else(|| Error::new("failed to flush appender"))
.with_source(err),
);
trap.trap(&err);
}
}
let _ = done.send(error);
let _ = done.send(());
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions appenders/async/tests/flushes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
use std::sync::Barrier;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;

use logforth_append_async::AsyncBuilder;
use logforth_core::Append;
Expand Down Expand Up @@ -56,10 +57,14 @@ impl Append for FailingFlush {
}

#[derive(Debug)]
struct NoopTrap;
struct CapturedTrap {
errors: mpsc::Sender<String>,
}

impl Trap for NoopTrap {
fn trap(&self, _: &Error) {}
impl Trap for CapturedTrap {
fn trap(&self, err: &Error) {
self.errors.send(err.to_string()).unwrap();
}
}

#[test]
Expand Down Expand Up @@ -92,14 +97,16 @@ fn flush_waits_for_worker_completion() {
}

#[test]
fn flush_propagates_errors() {
fn flush_handles_errors_in_worker_thread() {
let (tx, rx) = mpsc::channel();

let async_append = AsyncBuilder::new("async-flush-error")
.trap(NoopTrap)
.trap(CapturedTrap { errors: tx })
.append(FailingFlush)
.build();

let err = async_append.flush().unwrap_err();
let err = err.to_string();
assert!(err.contains("failed to flush"));
async_append.flush().unwrap();

let err = rx.recv().unwrap();
assert!(err.contains("flush failed"));
}