Skip to content

Commit 331eca0

Browse files
authored
refactor: Async should handle flush error in worker thread (#228)
Signed-off-by: tison <wander4096@gmail.com>
1 parent c95a3a3 commit 331eca0

4 files changed

Lines changed: 21 additions & 22 deletions

File tree

appenders/async/src/append.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,9 @@ impl Append for Async {
7777
let task = Task::Flush { done: done_tx };
7878
self.state.send_task(task)?;
7979

80-
match done_rx.recv() {
81-
Ok(None) => Ok(()),
82-
Ok(Some(err)) => Err(err),
83-
Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)),
84-
}
80+
done_rx
81+
.recv()
82+
.map_err(|err| Error::new("worker exited before completing flush").with_source(err))
8583
}
8684
}
8785

appenders/async/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#![cfg_attr(docsrs, feature(doc_cfg))]
1818
#![deny(missing_docs)]
1919

20-
use logforth_core::Error;
2120
use logforth_core::kv;
2221
use logforth_core::record::RecordOwned;
2322

@@ -35,7 +34,7 @@ enum Task {
3534
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
3635
},
3736
Flush {
38-
done: oneshot::Sender<Option<Error>>,
37+
done: oneshot::Sender<()>,
3938
},
4039
}
4140

appenders/async/src/worker.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,12 @@ impl Worker {
6767
});
6868
}
6969
Task::Flush { done } => {
70-
let mut error = None;
7170
for append in appends.iter() {
7271
if let Err(err) = append.flush() {
73-
error = Some(
74-
error
75-
.unwrap_or_else(|| Error::new("failed to flush appender"))
76-
.with_source(err),
77-
);
72+
trap.trap(&err);
7873
}
7974
}
80-
let _ = done.send(error);
75+
let _ = done.send(());
8176
}
8277
}
8378
}

appenders/async/tests/flushes.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616
use std::sync::Barrier;
1717
use std::sync::atomic::AtomicBool;
1818
use std::sync::atomic::Ordering;
19+
use std::sync::mpsc;
1920

2021
use logforth_append_async::AsyncBuilder;
2122
use logforth_core::Append;
@@ -56,10 +57,14 @@ impl Append for FailingFlush {
5657
}
5758

5859
#[derive(Debug)]
59-
struct NoopTrap;
60+
struct CapturedTrap {
61+
errors: mpsc::Sender<String>,
62+
}
6063

61-
impl Trap for NoopTrap {
62-
fn trap(&self, _: &Error) {}
64+
impl Trap for CapturedTrap {
65+
fn trap(&self, err: &Error) {
66+
self.errors.send(err.to_string()).unwrap();
67+
}
6368
}
6469

6570
#[test]
@@ -92,14 +97,16 @@ fn flush_waits_for_worker_completion() {
9297
}
9398

9499
#[test]
95-
fn flush_propagates_errors() {
100+
fn flush_handles_errors_in_worker_thread() {
101+
let (tx, rx) = mpsc::channel();
102+
96103
let async_append = AsyncBuilder::new("async-flush-error")
97-
.trap(NoopTrap)
104+
.trap(CapturedTrap { errors: tx })
98105
.append(FailingFlush)
99106
.build();
100107

101-
let err = async_append.flush().unwrap_err();
102-
let err = err.to_string();
103-
assert!(err.contains("failed to flush"));
108+
async_append.flush().unwrap();
109+
110+
let err = rx.recv().unwrap();
104111
assert!(err.contains("flush failed"));
105112
}

0 commit comments

Comments
 (0)