Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions src/cortex-share/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,33 @@ impl ShareSync {

let count = updates.len();

for update in updates {
self.send_update(update).await?;
let mut sent = 0;
let mut remaining = updates.into_iter();
while let Some(update) = remaining.next() {
if let Err(error) = self.send_update(update.clone()).await {
let mut unsent = Vec::with_capacity(count - sent);
unsent.push(update);
unsent.extend(remaining);
self.requeue_front(unsent).await;
return Err(error);
}
sent += 1;
}

debug!("Flushed {} sync updates", count);
Ok(count)
}

async fn requeue_front(&self, mut updates: Vec<SyncUpdate>) {
if updates.is_empty() {
return;
}

let mut pending = self.pending.lock().await;
updates.extend(std::mem::take(&mut *pending));
*pending = updates;
}

/// Send a single update.
async fn send_update(&self, update: SyncUpdate) -> Result<()> {
let response = self
Expand Down Expand Up @@ -192,6 +211,7 @@ impl Default for ShareSync {
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn test_queue_update() {
Expand All @@ -208,4 +228,47 @@ mod tests {
let pending = sync.pending.lock().await;
assert_eq!(pending.len(), 1);
}

#[tokio::test]
async fn test_flush_requeues_failed_and_unsent_updates() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let server = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0; 2048];
let _ = stream.read(&mut buf).await.unwrap();
stream
.write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 2\r\nconnection: close\r\n\r\nOK")
.await
.unwrap();

let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0; 2048];
let _ = stream.read(&mut buf).await.unwrap();
});

let sync = ShareSync::new().with_api_url(format!("http://{}", addr));
let share = SharedSession::new(
"test-session".to_string(),
"https://example.com/share/123".to_string(),
"secret123".to_string(),
);

sync.queue_update(&share, "first", serde_json::json!({"order": 1}))
.await;
sync.queue_update(&share, "second", serde_json::json!({"order": 2}))
.await;
sync.queue_update(&share, "third", serde_json::json!({"order": 3}))
.await;

let result = sync.flush().await;
assert!(result.is_err(), "flush should report the failed update");

let pending = sync.pending.lock().await;
let keys: Vec<_> = pending.iter().map(|update| update.key.as_str()).collect();
assert_eq!(keys, vec!["second", "third"]);

server.await.unwrap();
}
}