Skip to content

Commit 2364e56

Browse files
committed
fix(share): requeue sync updates on flush failure
1 parent 7954d02 commit 2364e56

1 file changed

Lines changed: 65 additions & 2 deletions

File tree

src/cortex-share/src/sync.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,33 @@ impl ShareSync {
123123

124124
let count = updates.len();
125125

126-
for update in updates {
127-
self.send_update(update).await?;
126+
let mut sent = 0;
127+
let mut remaining = updates.into_iter();
128+
while let Some(update) = remaining.next() {
129+
if let Err(error) = self.send_update(update.clone()).await {
130+
let mut unsent = Vec::with_capacity(count - sent);
131+
unsent.push(update);
132+
unsent.extend(remaining);
133+
self.requeue_front(unsent).await;
134+
return Err(error);
135+
}
136+
sent += 1;
128137
}
129138

130139
debug!("Flushed {} sync updates", count);
131140
Ok(count)
132141
}
133142

143+
async fn requeue_front(&self, mut updates: Vec<SyncUpdate>) {
144+
if updates.is_empty() {
145+
return;
146+
}
147+
148+
let mut pending = self.pending.lock().await;
149+
updates.extend(std::mem::take(&mut *pending));
150+
*pending = updates;
151+
}
152+
134153
/// Send a single update.
135154
async fn send_update(&self, update: SyncUpdate) -> Result<()> {
136155
let response = self
@@ -192,6 +211,7 @@ impl Default for ShareSync {
192211
#[cfg(test)]
193212
mod tests {
194213
use super::*;
214+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
195215

196216
#[tokio::test]
197217
async fn test_queue_update() {
@@ -208,4 +228,47 @@ mod tests {
208228
let pending = sync.pending.lock().await;
209229
assert_eq!(pending.len(), 1);
210230
}
231+
232+
#[tokio::test]
233+
async fn test_flush_requeues_failed_and_unsent_updates() {
234+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
235+
let addr = listener.local_addr().unwrap();
236+
237+
let server = tokio::spawn(async move {
238+
let (mut stream, _) = listener.accept().await.unwrap();
239+
let mut buf = [0; 2048];
240+
let _ = stream.read(&mut buf).await.unwrap();
241+
stream
242+
.write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 2\r\nconnection: close\r\n\r\nOK")
243+
.await
244+
.unwrap();
245+
246+
let (mut stream, _) = listener.accept().await.unwrap();
247+
let mut buf = [0; 2048];
248+
let _ = stream.read(&mut buf).await.unwrap();
249+
});
250+
251+
let sync = ShareSync::new().with_api_url(format!("http://{}", addr));
252+
let share = SharedSession::new(
253+
"test-session".to_string(),
254+
"https://example.com/share/123".to_string(),
255+
"secret123".to_string(),
256+
);
257+
258+
sync.queue_update(&share, "first", serde_json::json!({"order": 1}))
259+
.await;
260+
sync.queue_update(&share, "second", serde_json::json!({"order": 2}))
261+
.await;
262+
sync.queue_update(&share, "third", serde_json::json!({"order": 3}))
263+
.await;
264+
265+
let result = sync.flush().await;
266+
assert!(result.is_err(), "flush should report the failed update");
267+
268+
let pending = sync.pending.lock().await;
269+
let keys: Vec<_> = pending.iter().map(|update| update.key.as_str()).collect();
270+
assert_eq!(keys, vec!["second", "third"]);
271+
272+
server.await.unwrap();
273+
}
211274
}

0 commit comments

Comments
 (0)