diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 600864454..6c8dd8ea9 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -57,14 +57,15 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) - /// .unary_notify(Pipeline, "example", Vec::new(), |input, output, notificator| { + /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| { /// input.for_each(|cap, data| { /// output.session(&cap).give_vec(&mut data.replace(Vec::new())); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// }); - /// notificator.for_each(|cap,_,_| { - /// println!("done with time: {:?}", cap.time()); + /// notificator.for_each(|cap, count, _| { + /// println!("done with time: {:?}, requested {} times", cap.time(), count); + /// assert!(*cap.time() == 0 && count == 2 || count == 1); /// }); /// }); /// }); @@ -99,7 +100,7 @@ impl<'a, T: Timestamp> Iterator for Notificator<'a, T> { /// timestamp. #[inline] fn next(&mut self) -> Option<(Capability, u64)> { - self.inner.next(self.frontiers).map(|x| (x,1)) + self.inner.next_count(self.frontiers) } } @@ -297,7 +298,7 @@ impl FrontierNotificator { #[inline] pub fn notify_at_frontiered<'a>(&mut self, cap: Capability, frontiers: &'a [&'a MutableAntichain]) { if frontiers.iter().all(|f| !f.less_equal(cap.time())) { - self.available.push(OrderReversed::new(cap)); + self.available.push(OrderReversed::new(cap, 1)); } else { self.pending.push((cap,1)); @@ -325,7 +326,7 @@ impl FrontierNotificator { for i in 0 .. self.pending.len() { if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) { // TODO : This clones a capability, whereas we could move it instead. - self.available.push(OrderReversed::new(self.pending[i].0.clone())); + self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1)); self.pending[i].1 = 0; } } @@ -333,22 +334,36 @@ impl FrontierNotificator { } } - /// Returns the next available capability with respect to the supplied frontiers, if one exists. + /// Returns the next available capability with respect to the supplied frontiers, if one exists, + /// and the count of how many instances are found. /// /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) - /// use `for_each` + /// use `for_each`, or (ii) call `make_available` first. #[inline] - pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain]) -> Option> { + pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain]) -> Option<(Capability, u64)> { if self.available.is_empty() { self.make_available(frontiers); } self.available.pop().map(|front| { - while self.available.peek() == Some(&front) { self.available.pop(); } - front.element + let mut count = front.value; + while self.available.peek() == Some(&front) { + count += self.available.pop().unwrap().value; + } + (front.element, count) }) } + /// Returns the next available capability with respect to the supplied frontiers, if one exists. + /// + /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain + /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) + /// use `for_each`, or (ii) call `make_available` first. + #[inline] + pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain]) -> Option> { + self.next_count(frontiers).map(|(cap, _)| cap) + } + /// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting /// the frontiers. /// @@ -408,10 +423,11 @@ impl FrontierNotificator { #[derive(Debug, PartialEq, Eq)] struct OrderReversed { element: Capability, + value: u64, } impl OrderReversed { - fn new(element: Capability) -> Self { OrderReversed { element } } + fn new(element: Capability, value: u64) -> Self { OrderReversed { element, value} } } impl PartialOrd for OrderReversed {