-
Notifications
You must be signed in to change notification settings - Fork 289
Description
At the moment, we have a hand-wavy relationship between pushing records at containers and the effect on progress tracking. Specifically, we need to ensure that a set of containers has the same summed record count as the containers after an exchange channel. Progress tracking tracks the record count at the output of an operator before a tee, not after.
Vectors uphold the property that their length is equal to the number of records pushed, so they implicitly uphold the guarantee that the records count after an exchange is the same count as before summed across all produced containers.
However, it is easy to implement a container that does not have this property. Consider implementing the container traits for HashMap:
diff --git i/container/src/lib.rs w/container/src/lib.rs
index 1b12c488..576a35b0 100644
--- i/container/src/lib.rs
+++ w/container/src/lib.rs
@@ -224,6 +224,49 @@ impl<T: Clone> PushInto<&&T> for Vec<T> {
}
}
+mod hash_map {
+ use std::collections::HashMap;
+ use std::hash::Hash;
+
+ use super::*;
+
+ impl<K, V> Accountable for HashMap<K, V> {
+ #[inline] fn record_count(&self) -> i64 { i64::try_from(HashMap::len(self)).unwrap() }
+ #[inline] fn is_empty(&self) -> bool { HashMap::is_empty(self) }
+ }
+
+ impl<K, V> DrainContainer for HashMap<K, V> {
+ type Item<'a> = (K, V) where Self: 'a;
+ type DrainIter<'a> = std::collections::hash_map::Drain<'a, K, V> where Self: 'a;
+ #[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
+ HashMap::drain(self)
+ }
+ }
+
+ impl<K: Eq+Hash, V> SizableContainer for HashMap<K, V> {
+ fn at_capacity(&self) -> bool {
+ self.len() == self.capacity()
+ }
+ fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
+ if self.capacity() == 0 {
+ *self = stash.take().unwrap_or_default();
+ self.clear();
+ }
+ let preferred = buffer::default_capacity::<(K, V)>();
+ if self.capacity() < preferred {
+ self.reserve(preferred - self.capacity());
+ }
+ }
+ }
+
+ impl<K: Eq+Hash, V> PushInto<(K, V)> for HashMap<K, V> {
+ #[inline]
+ fn push_into(&mut self, (key, value): (K, V)) {
+ self.insert(key, value);
+ }
+ }
+}
+
mod rc {
impl<T: crate::Accountable> crate::Accountable for std::rc::Rc<T> {
#[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }The implementation compiles, but HashMap::insert does not have the property that the length of the hash map equals the number of insert calls.