diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 1114da0d0..32a19f824 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -147,7 +147,31 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { } /// Creates an operator implementation from supplied logic constructor. + /// + /// Boxes the closure to avoid per-closure monomorphization based on `L`. + /// For the fully generic (non-boxing) path, see [`build_typed`]. pub fn build(self, logic: L) + where + L: FnMut(&mut SharedProgress)->bool+'static + { + self.build_boxed(Box::new(logic)); + } + + /// Creates an operator implementation from pre-boxed logic. + /// + /// This method exists primarily to force the `Box` coercion, which + /// can otherwise easily be `Box` for specialized `L` instead. + pub fn build_boxed(self, logic: Box)->bool>) { + self.build_typed(logic); + } + + /// Like `build_reschedule`, but specialized to the closure type `L`. + /// + /// This method is instantiated once per distinct `L`, and one should be + /// mindful of monomorphization bloat. Callers with many distinct closures + /// should consider erasing their variation, for example via `Box`, + /// as demonstrated in [`build`]. + pub fn build_typed(self, logic: L) where L: FnMut(&mut SharedProgress)->bool+'static { diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index b76bac71a..bdb4742e7 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -136,61 +136,75 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { /// Creates an operator implementation from supplied logic constructor. /// /// Unlike `build`, the supplied closure can indicate if the operator - /// should be considered incomplete. The `build` method indicates that - /// the operator is never incomplete and can be shut down at the system's - /// discretion. + /// should be considered incomplete. A not-incomplete operator will be + /// shut down if it has empty input frontiers and holds no capabilities. + /// Flagging oneself as incomplete is most commonly used by operators + /// that manage external resources like file writes or transactions that + /// must complete before the operator should be shut down. + /// + /// This method boxes `B` and `L` and delegates to [`build_reschedule_boxed`]. + /// For the fully generic (non-boxing) path, see [`build_reschedule_typed`]. pub fn build_reschedule(self, constructor: B) where B: FnOnce(Vec>) -> L, L: FnMut(&[MutableAntichain])->bool+'static { - // create capabilities, discard references to their creation. - let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); - for batch in self.internal.borrow().iter() { - capabilities.push(Capability::new(T::minimum(), Rc::clone(batch))); - // Discard evidence of creation, as we are assumed to start with one. - batch.borrow_mut().clear(); - } + self.build_reschedule_boxed(Box::new(|caps| -> Box])->bool> { Box::new(constructor(caps)) })); + } + + /// Like `build_reschedule`, but with a pre-boxed constructor. + /// + /// This method exists primarily to force the `Box` coercions, which + /// can otherwise easily be `Box` or `Box` for specialized `B` and `L` instead. + pub fn build_reschedule_boxed<'a>(self, constructor: Box>) -> Box])->bool> + 'a>) { + self.build_reschedule_typed(constructor); + } - let mut logic = constructor(capabilities); + /// Like `build_reschedule`, but specialized to the closure types `B` and `L`. + /// + /// This method is instantiated once per distinct `(B, L)` pair, and one + /// should be mindful of monomorphization bloat. Callers with many closures + /// should consider erasing their variation, for example via `Box`. + /// + /// This method calls `build_typed` directly using a new closure, mirroring + /// the variation in `L`, rather than forcing it to be reboxed via `build`. + pub fn build_reschedule_typed(self, constructor: B) + where + B: FnOnce(Vec>) -> L, + L: FnMut(&[MutableAntichain])->bool+'static + { + let mut logic = constructor(self.mint_capabilities()); - let mut self_frontier = self.frontier; - let self_consumed = self.consumed; - let self_internal = self.internal; - let self_produced = self.produced; + let mut bookkeeping = ProgressBookkeeping { + frontier: self.frontier, + consumed: self.consumed, + internal: self.internal, + produced: self.produced, + }; let raw_logic = move |progress: &mut SharedProgress| { - - // drain frontier changes - for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) { - frontier.update_iter(progress.drain()); - } - - // invoke supplied logic - let result = logic(&self_frontier[..]); - - // move batches of consumed changes. - for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) { - consumed.borrow_mut().drain_into(progress); - } - - // move batches of internal changes. - let self_internal_borrow = self_internal.borrow_mut(); - for index in 0 .. self_internal_borrow.len() { - let mut borrow = self_internal_borrow[index].borrow_mut(); - progress.internals[index].extend(borrow.drain()); - } - - // move batches of produced changes. - for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) { - produced.borrow_mut().drain_into(progress); - } - + bookkeeping.drain_frontiers(progress); + let result = logic(bookkeeping.frontiers()); + bookkeeping.publish_progress(progress); result }; - self.builder.build(raw_logic); + self.builder.build_typed(raw_logic); + } + + /// Create initial capabilities, one per output, and clear their creation evidence. + /// + /// This method is specifically outlined from `Self::build_reschedule_typed` to avoid + /// monomorphization bloat, as it depends only on `T`, not on the closures. + fn mint_capabilities(&self) -> Vec> { + let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); + for batch in self.internal.borrow().iter() { + capabilities.push(Capability::new(T::minimum(), Rc::clone(batch))); + // Discard evidence of creation, as we are assumed to start with one. + batch.borrow_mut().clear(); + } + capabilities } /// Get the identifier assigned to the operator being constructed @@ -207,6 +221,50 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { } +/// Progress-tracking state that is independent of operator logic. +/// +/// Extracted so that `drain_frontiers` and `publish_progress` are monomorphized +/// once per timestamp type `T`, rather than once per closure type passed to +/// `build_reschedule`. +struct ProgressBookkeeping { + frontier: Vec>, + consumed: Vec>>>, + internal: Rc>>>>>, + produced: Vec>>>, +} + +impl ProgressBookkeeping { + /// The current input frontiers, for passing to operator logic. + #[inline(always)] fn frontiers(&self) -> &[MutableAntichain] { &self.frontier[..] } + + /// Drain incoming frontier changes from `SharedProgress` into our local antichains. + fn drain_frontiers(&mut self, progress: &mut SharedProgress) { + for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) { + frontier.update_iter(progress.drain()); + } + } + + /// Publish consumed, internal, and produced changes back to `SharedProgress`. + fn publish_progress(&self, progress: &mut SharedProgress) { + // move batches of consumed changes. + for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) { + consumed.borrow_mut().drain_into(progress); + } + + // move batches of internal changes. + let self_internal_borrow = self.internal.borrow_mut(); + for index in 0 .. self_internal_borrow.len() { + let mut borrow = self_internal_borrow[index].borrow_mut(); + progress.internals[index].extend(borrow.drain()); + } + + // move batches of produced changes. + for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) { + produced.borrow_mut().drain_into(progress); + } + } +} + #[cfg(test)] mod tests { use crate::dataflow::operators::generic::OutputBuilder;