Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bd-client-stats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ itertools.workspace = true
log.workspace = true
parking_lot.workspace = true
protobuf.workspace = true
sha2.workspace = true
time.workspace = true
tokio.workspace = true

Expand Down
272 changes: 181 additions & 91 deletions bd-client-stats/src/file_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ pub struct StatsUploadRequestHandle {
stats_upload_request: StatsUploadRequest,
}

pub struct PendingUpload {
pub request: StatsUploadRequest,
pub source_file_ids: Vec<String>,
}

impl StatsUploadRequestHandle {
pub fn snapshot(&mut self) -> Option<Snapshot> {
// TODO(mattklein123): Currently we only support a single snapshot per upload. We could now
// support multiple per upload, but we would need to handle merging into a single request at
// pending upload time. We can consider this as a follow up.
// Disk snapshots are still written as one-snapshot files. Pending upload batching may later
// combine multiple files into one request, but the writable handle always exposes the single
// snapshot owned by the file currently being updated.
if self.stats_upload_request.snapshot.is_empty() {
None
} else {
Expand All @@ -58,12 +63,12 @@ impl StatsUploadRequestHandle {
//

struct InitializedInner {
file_system: Box<dyn FileSystem>,
file_system: Arc<dyn FileSystem>,
index: VecDeque<PendingFile>,
in_flight_uploads: HashSet<String>,
}
enum Inner {
NotInitialized(Option<Box<dyn FileSystem>>),
NotInitialized(Option<Arc<dyn FileSystem>>),
Initialized(InitializedInner),
}
pub struct FileManager {
Expand Down Expand Up @@ -111,12 +116,56 @@ impl InitializedInner {
Ok(())
}

async fn delete_pending_upload(&mut self, index: usize) -> anyhow::Result<()> {
log::debug!("deleting pending upload: {}", self.index[index].name);
self.delete_snapshot(index).await?;
self.write_index().await?;
async fn delete_pending_uploads(&mut self, names: &[String]) -> anyhow::Result<()> {
let mut remaining_names: HashSet<String> = names.iter().cloned().collect();
let mut index = 0;

Ok(())
while index < self.index.len() {
let file_name = self.index[index].name.clone();
if remaining_names.remove(&file_name) {
log::debug!("deleting pending upload: {file_name}");
self.delete_snapshot(index).await?;
} else {
index += 1;
}
}

for name in remaining_names {
// A completion can race with max-files eviction removing an older in-flight entry before the
// upload ack arrives. We could teach eviction to preserve in-flight files, but that is more
// complicated than treating a missing entry here as an already-cleaned-up upload.
log::debug!("pending upload {name} not found in index");
}

self.write_index().await
}

fn eligible_pending_upload_ids(
&self,
only_if_file_is_old: bool,
now: time::OffsetDateTime,
max_aggregation_window_per_file: Duration,
) -> Vec<String> {
let mut eligible = Vec::new();

for file in &self.index {
if self.in_flight_uploads.contains(&file.name) {
continue;
}

let is_old = file.period_start.to_offset_date_time() + max_aggregation_window_per_file <= now;
if only_if_file_is_old {
if !is_old {
break;
}
eligible.push(file.name.clone());
} else {
eligible.push(file.name.clone());
break;
}
}

eligible
}
}

Expand Down Expand Up @@ -170,7 +219,7 @@ impl FileManager {
runtime_loader: &ConfigLoader,
) -> Self {
Self {
inner: Mutex::new(Inner::NotInitialized(Some(file_system))),
inner: Mutex::new(Inner::NotInitialized(Some(Arc::from(file_system)))),
time_provider,
max_aggregated_files: runtime_loader.register_int_watch(),
max_aggregation_window_per_file: runtime_loader.register_duration_watch(),
Expand Down Expand Up @@ -312,122 +361,163 @@ impl FileManager {
pub async fn get_or_create_pending_upload(
&self,
only_if_file_is_old: bool,
) -> anyhow::Result<Option<StatsUploadRequest>> {
let mut inner = self.inner.lock().await;
let initialized_inner = inner.get_initialized().await?;
) -> anyhow::Result<Option<PendingUpload>> {
let now = self.time_provider.now();
let max_aggregation_window_per_file = *self.max_aggregation_window_per_file.read();

loop {
let mut inner = self.inner.lock().await;
let initialized_inner = inner.get_initialized().await?;

if initialized_inner.index.is_empty() {
log::debug!("no pending upload: index is empty");
return Ok(None);
}

let found_index = InitializedInner::find_index(&initialized_inner.index, |file| {
!initialized_inner.in_flight_uploads.contains(&file.name)
});
let eligible_file_ids = initialized_inner.eligible_pending_upload_ids(
only_if_file_is_old,
now,
max_aggregation_window_per_file,
);

let Some(index) = found_index else {
log::debug!("no pending upload: all files are in flight");
let Some(first_file_id) = eligible_file_ids.first() else {
if only_if_file_is_old {
log::debug!("no pending upload: file is not old enough");
} else {
log::debug!("no pending upload: all files are in flight");
}
return Ok(None);
};

if only_if_file_is_old
&& initialized_inner.index[index]
.period_start
.to_offset_date_time()
+ *self.max_aggregation_window_per_file.read()
> self.time_provider.now()
{
log::debug!("no pending upload: file is not old enough");
return Ok(None);
debug_assert!(
initialized_inner
.index
.iter()
.any(|file| &file.name == first_file_id)
);

let mut should_write_index = false;
let file_system = initialized_inner.file_system.clone();
let mut pending_files = Vec::new();

for file_id in &eligible_file_ids {
let Some(index) =
InitializedInner::find_index(&initialized_inner.index, |file| file.name == *file_id)
else {
continue;
};

// Mark the selected files in flight under the lock before we release it for disk reads.
if initialized_inner.index[index].period_end.is_none() {
log::debug!(
"marking entry as ready to upload: {}",
initialized_inner.index[index].name
);
initialized_inner.index[index].period_end = now.into_proto();
should_write_index = true;
}

initialized_inner
.in_flight_uploads
.insert(initialized_inner.index[index].name.clone());
pending_files.push(initialized_inner.index[index].clone());
}

// If there is a pending upload, first attempt to re-upload. Otherwise, mark the first entry
// as ready to upload and return it.
if initialized_inner.index[index].period_end.is_none() {
log::debug!(
"marking entry as ready to upload: {}",
initialized_inner.index[index].name
);
initialized_inner.index[index].period_end = self.time_provider.now().into_proto();
if should_write_index {
initialized_inner.write_index().await?;
}
drop(inner);

initialized_inner
.in_flight_uploads
.insert(initialized_inner.index[index].name.clone());
let mut pending_request = StatsUploadRequest::default();
let mut source_file_ids = Vec::new();
let mut bad_file_ids = Vec::new();

let path = STATS_DIRECTORY.join(&initialized_inner.index[index].name);
for pending_file in pending_files {
let path = STATS_DIRECTORY.join(&pending_file.name);
match file_system
.read_file(&path)
.await
.and_then(|contents| read_compressed_protobuf::<StatsUploadRequest>(&contents))
{
Ok(mut request_from_disk) => {
// Each pending file still contains one snapshot on disk. Batch uploads preserve those
// per-file aggregation windows by carrying each snapshot forward separately.
debug_assert_eq!(1, request_from_disk.snapshot.len());
if let Some(snapshot) = request_from_disk.snapshot.first_mut() {
snapshot.occurred_at = Some(Occurred_at::Aggregated(Aggregated {
period_start: pending_file.period_start.clone(),
period_end: pending_file.period_end.clone(),
..Default::default()
}));
}

pending_request.snapshot.extend(request_from_disk.snapshot);
source_file_ids.push(pending_file.name);
},
Err(e) => {
// We failed to read the data, so the file must be bad. This could happen if we change
// the schema in an incompatible way or if the file is corrupt. Delete the file and
// accept the loss of this upload.
log::debug!("unable to read pending upload {}: {e}", path.display());
bad_file_ids.push(pending_file.name);
},
}
}

match initialized_inner
.file_system
.read_file(&path)
.await
.and_then(|contents| read_compressed_protobuf::<StatsUploadRequest>(&contents))
{
Ok(mut pending_request) => {
// At the time of creation period_end was not known so we set both start and end here.
// In the future if we support multiple snapshots per upload we would need to handle that
// here as well.
debug_assert_eq!(1, pending_request.snapshot.len());
if !pending_request.snapshot.is_empty() {
pending_request.snapshot[0].occurred_at = Some(Occurred_at::Aggregated(Aggregated {
period_start: initialized_inner.index[index].period_start.clone(),
period_end: initialized_inner.index[index].period_end.clone(),
..Default::default()
}));
}

return Ok(Some(pending_request));
},
Err(e) => {
// We failed to read the data, so the file must be bad. This could happen if we change
// the schema in an incompatible way or if the file is corrupt. Delete the file and
// accept the loss of this upload.
log::debug!("unable to read pending upload {}: {e}", path.display());
initialized_inner
.in_flight_uploads
.remove(&initialized_inner.index[index].name);
initialized_inner.delete_pending_upload(index).await?;
},
if !bad_file_ids.is_empty() {
let mut inner = self.inner.lock().await;
let initialized_inner = inner.get_initialized().await?;
for file_id in &bad_file_ids {
initialized_inner.in_flight_uploads.remove(file_id);
}
initialized_inner
.delete_pending_uploads(&bad_file_ids)
.await?;
}

if source_file_ids.is_empty() {
continue;
}

return Ok(Some(PendingUpload {
request: pending_request,
source_file_ids,
}));
}
}

// Called when a pending upload returned from `get_or_create_pending_upload` is successfully
// uploaded
pub async fn complete_pending_upload(&self, uuid: &str, success: bool) -> anyhow::Result<()> {
pub async fn complete_pending_upload(
&self,
source_file_ids: &[String],
success: bool,
) -> anyhow::Result<()> {
// We should always have an entry to complete if this code runs.
let mut inner = self.inner.lock().await;
let initialized_inner = inner.get_initialized().await?;

// We remove from in-flight regardless of whether the upload succeeded or failed. If it failed
// it will be retried on the next periodic upload attempt.
initialized_inner.in_flight_uploads.remove(uuid);
for uuid in source_file_ids {
initialized_inner.in_flight_uploads.remove(uuid);
}

if !success {
log::debug!("not completing pending upload {uuid} due to failure");
log::debug!("not completing pending upload batch {source_file_ids:?} due to failure");
return Ok(());
}

let found_index =
InitializedInner::find_index(&initialized_inner.index, |file| file.name == uuid);

if let Some(index) = found_index {
log::debug!(
"completing pending upload: {}",
initialized_inner.index[index].name
);
debug_assert!(initialized_inner.index[index].period_end.is_some());
initialized_inner.delete_pending_upload(index).await?;
} else {
// There is a race condition in which we could theoretically have reached max files, but
// there is an upload in flight that comes back after we already popped the first entry.
// We could handle this by having the max file code not pop inflight uploads, but that is
// more complicated than just ignoring the response here.
log::debug!("pending upload {uuid} not found in index");
for uuid in source_file_ids {
if let Some(index) =
InitializedInner::find_index(&initialized_inner.index, |file| file.name == *uuid)
{
debug_assert!(initialized_inner.index[index].period_end.is_some());
}
}

Ok(())
initialized_inner
.delete_pending_uploads(source_file_ids)
.await
}
}
Loading
Loading