Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,31 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
}

/// Creates an operator implementation from supplied logic constructor.
///
/// Boxes the closure to avoid per-closure monomorphization based on `L`.
/// For the fully generic (non-boxing) path, see [`build_typed`].
pub fn build<L>(self, logic: L)
where
L: FnMut(&mut SharedProgress<T>)->bool+'static
{
self.build_boxed(Box::new(logic));
}

/// Creates an operator implementation from pre-boxed logic.
///
/// This method exists primarily to force the `Box<dyn ...>` coercion, which
/// can otherwise easily be `Box<L>` for specialized `L` instead.
pub fn build_boxed(self, logic: Box<dyn FnMut(&mut SharedProgress<T>)->bool>) {
self.build_typed(logic);
}

/// Like `build_reschedule`, but specialized to the closure type `L`.
///
/// This method is instantiated once per distinct `L`, and one should be
/// mindful of monomorphization bloat. Callers with many distinct closures
/// should consider erasing their variation, for example via `Box<dyn ...>`,
/// as demonstrated in [`build`].
pub fn build_typed<L>(self, logic: L)
where
L: FnMut(&mut SharedProgress<T>)->bool+'static
{
Expand Down
142 changes: 100 additions & 42 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,61 +136,75 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
/// Creates an operator implementation from supplied logic constructor.
///
/// Unlike `build`, the supplied closure can indicate if the operator
/// should be considered incomplete. The `build` method indicates that
/// the operator is never incomplete and can be shut down at the system's
/// discretion.
/// should be considered incomplete. A not-incomplete operator will be
/// shut down if it has empty input frontiers and holds no capabilities.
/// Flagging oneself as incomplete is most commonly used by operators
/// that manage external resources like file writes or transactions that
/// must complete before the operator should be shut down.
///
/// This method boxes `B` and `L` and delegates to [`build_reschedule_boxed`].
/// For the fully generic (non-boxing) path, see [`build_reschedule_typed`].
pub fn build_reschedule<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])->bool+'static
{
// create capabilities, discard references to their creation.
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for batch in self.internal.borrow().iter() {
capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
// Discard evidence of creation, as we are assumed to start with one.
batch.borrow_mut().clear();
}
self.build_reschedule_boxed(Box::new(|caps| -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> { Box::new(constructor(caps)) }));
}

/// Like `build_reschedule`, but with a pre-boxed constructor.
///
/// This method exists primarily to force the `Box<dyn ...>` coercions, which
/// can otherwise easily be `Box<B>` or `Box<L>` for specialized `B` and `L` instead.
pub fn build_reschedule_boxed<'a>(self, constructor: Box<dyn FnOnce(Vec<Capability<T>>) -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> + 'a>) {
self.build_reschedule_typed(constructor);
}

let mut logic = constructor(capabilities);
/// Like `build_reschedule`, but specialized to the closure types `B` and `L`.
///
/// This method is instantiated once per distinct `(B, L)` pair, and one
/// should be mindful of monomorphization bloat. Callers with many closures
/// should consider erasing their variation, for example via `Box<dyn ...>`.
///
/// This method calls `build_typed` directly using a new closure, mirroring
/// the variation in `L`, rather than forcing it to be reboxed via `build`.
pub fn build_reschedule_typed<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])->bool+'static
{
let mut logic = constructor(self.mint_capabilities());

let mut self_frontier = self.frontier;
let self_consumed = self.consumed;
let self_internal = self.internal;
let self_produced = self.produced;
let mut bookkeeping = ProgressBookkeeping {
frontier: self.frontier,
consumed: self.consumed,
internal: self.internal,
produced: self.produced,
};

let raw_logic =
move |progress: &mut SharedProgress<T>| {

// drain frontier changes
for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
frontier.update_iter(progress.drain());
}

// invoke supplied logic
let result = logic(&self_frontier[..]);

// move batches of consumed changes.
for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
consumed.borrow_mut().drain_into(progress);
}

// move batches of internal changes.
let self_internal_borrow = self_internal.borrow_mut();
for index in 0 .. self_internal_borrow.len() {
let mut borrow = self_internal_borrow[index].borrow_mut();
progress.internals[index].extend(borrow.drain());
}

// move batches of produced changes.
for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
produced.borrow_mut().drain_into(progress);
}

bookkeeping.drain_frontiers(progress);
let result = logic(bookkeeping.frontiers());
bookkeeping.publish_progress(progress);
result
};

self.builder.build(raw_logic);
self.builder.build_typed(raw_logic);
}

/// Create initial capabilities, one per output, and clear their creation evidence.
///
/// This method is specifically outlined from `Self::build_reschedule_typed` to avoid
/// monomorphization bloat, as it depends only on `T`, not on the closures.
fn mint_capabilities(&self) -> Vec<Capability<T>> {
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for batch in self.internal.borrow().iter() {
capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
// Discard evidence of creation, as we are assumed to start with one.
batch.borrow_mut().clear();
}
capabilities
}

/// Get the identifier assigned to the operator being constructed
Expand All @@ -207,6 +221,50 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
}


/// Progress-tracking state that is independent of operator logic.
///
/// Extracted so that `drain_frontiers` and `publish_progress` are monomorphized
/// once per timestamp type `T`, rather than once per closure type passed to
/// `build_reschedule`.
struct ProgressBookkeeping<T: Timestamp> {
frontier: Vec<MutableAntichain<T>>,
consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}

impl<T: Timestamp> ProgressBookkeeping<T> {
/// The current input frontiers, for passing to operator logic.
#[inline(always)] fn frontiers(&self) -> &[MutableAntichain<T>] { &self.frontier[..] }

/// Drain incoming frontier changes from `SharedProgress` into our local antichains.
fn drain_frontiers(&mut self, progress: &mut SharedProgress<T>) {
for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) {
frontier.update_iter(progress.drain());
}
}

/// Publish consumed, internal, and produced changes back to `SharedProgress`.
fn publish_progress(&self, progress: &mut SharedProgress<T>) {
// move batches of consumed changes.
for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) {
consumed.borrow_mut().drain_into(progress);
}

// move batches of internal changes.
let self_internal_borrow = self.internal.borrow_mut();
for index in 0 .. self_internal_borrow.len() {
let mut borrow = self_internal_borrow[index].borrow_mut();
progress.internals[index].extend(borrow.drain());
}

// move batches of produced changes.
for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) {
produced.borrow_mut().drain_into(progress);
}
}
}

#[cfg(test)]
mod tests {
use crate::dataflow::operators::generic::OutputBuilder;
Expand Down
Loading