From c6e3403db5efd766e761720c0ddb26afc584b573 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 9 Apr 2026 01:10:23 -0400 Subject: [PATCH 1/4] Make Scope a type --- timely/examples/event_driven.rs | 1 - timely/examples/unionfind.rs | 5 +- .../operators/core/capture/capture.rs | 6 +- .../dataflow/operators/core/capture/replay.rs | 13 +- timely/src/dataflow/operators/core/concat.rs | 21 +- .../src/dataflow/operators/core/enterleave.rs | 34 +-- .../src/dataflow/operators/core/exchange.rs | 7 +- .../src/dataflow/operators/core/feedback.rs | 28 +-- timely/src/dataflow/operators/core/filter.rs | 7 +- timely/src/dataflow/operators/core/input.rs | 26 +- timely/src/dataflow/operators/core/inspect.rs | 26 +- timely/src/dataflow/operators/core/map.rs | 31 +-- timely/src/dataflow/operators/core/ok_err.rs | 11 +- .../src/dataflow/operators/core/partition.rs | 11 +- timely/src/dataflow/operators/core/probe.rs | 18 +- timely/src/dataflow/operators/core/rc.rs | 11 +- timely/src/dataflow/operators/core/reclock.rs | 12 +- .../src/dataflow/operators/core/to_stream.rs | 10 +- .../operators/core/unordered_input.rs | 13 +- .../dataflow/operators/generic/builder_raw.rs | 30 +-- .../dataflow/operators/generic/builder_rc.rs | 48 ++-- .../dataflow/operators/generic/operator.rs | 165 ++++++------ .../operators/vec/aggregation/aggregate.rs | 11 +- .../vec/aggregation/state_machine.rs | 11 +- timely/src/dataflow/operators/vec/branch.rs | 19 +- .../src/dataflow/operators/vec/broadcast.rs | 7 +- timely/src/dataflow/operators/vec/count.rs | 13 +- timely/src/dataflow/operators/vec/delay.rs | 25 +- timely/src/dataflow/operators/vec/filter.rs | 7 +- .../dataflow/operators/vec/flow_controlled.rs | 17 +- timely/src/dataflow/operators/vec/input.rs | 17 +- timely/src/dataflow/operators/vec/map.rs | 17 +- .../src/dataflow/operators/vec/partition.rs | 11 +- timely/src/dataflow/operators/vec/queue.rs | 4 +- timely/src/dataflow/operators/vec/result.rs | 33 +-- .../src/dataflow/operators/vec/to_stream.rs | 5 +- .../dataflow/operators/vec/unordered_input.rs | 9 +- timely/src/dataflow/scopes/child.rs | 237 ++++++++++++++---- timely/src/dataflow/scopes/mod.rs | 184 +------------- timely/src/dataflow/stream.rs | 38 ++- timely/src/execute.rs | 4 +- timely/src/worker.rs | 14 +- timely/tests/shape_scaling.rs | 1 - 43 files changed, 613 insertions(+), 605 deletions(-) diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index f4c3b638e..9183c5d7a 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -1,4 +1,3 @@ -use timely::dataflow::Scope; use timely::dataflow::operators::{Input, Probe, Enter, Leave}; use timely::dataflow::operators::vec::Map; diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index 1e62e619c..71d29ec51 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use timely::dataflow::*; +use timely::progress::Timestamp; use timely::dataflow::operators::{Input, Exchange, Probe}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; @@ -53,8 +54,8 @@ trait UnionFind { fn union_find(self) -> Self; } -impl UnionFind for StreamVec { - fn union_find(self) -> StreamVec { +impl UnionFind for StreamVec { + fn union_find(self) -> StreamVec { self.unary(Pipeline, "UnionFind", |_,_| { diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index 07545ddab..c6df9a7e5 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -5,7 +5,7 @@ //! and there are several default implementations, including a linked-list, Rust's MPSC //! queue, and a binary serializer wrapping any `W: Write`. -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; @@ -115,8 +115,8 @@ pub trait Capture : Sized { } } -impl Capture for Stream { - fn capture_into+'static>(self, mut event_pusher: P) { +impl Capture for Stream { + fn capture_into+'static>(self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 1883110ea..70f46cb53 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -39,6 +39,7 @@ //! than that in which the stream was captured. use crate::dataflow::{Scope, Stream}; +use crate::scheduling::Scheduler; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; @@ -50,16 +51,16 @@ use crate::dataflow::channels::Message; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { - /// Replays `self` into the provided scope, as a `Stream`. - fn replay_into>(self, scope: &mut S) -> Stream { + /// Replays `self` into the provided scope, as a `Stream`. + fn replay_into(self, scope: &mut Scope) -> Stream { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } - /// Replays `self` into the provided scope, as a `Stream`. + /// Replays `self` into the provided scope, as a `Stream`. /// /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself. - fn replay_core>(self, scope: &mut S, period: Option) -> Stream; + fn replay_core(self, scope: &mut Scope, period: Option) -> Stream; } impl Replay for I @@ -67,7 +68,7 @@ where I : IntoIterator, ::Item: EventIterator+'static, { - fn replay_core>(self, scope: &mut S, period: Option) -> Stream{ + fn replay_core(self, scope: &mut Scope, period: Option) -> Stream{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); @@ -88,7 +89,7 @@ where // The first thing we do is modify our capabilities to match the number of streams we manage. // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as // our very first action. - progress.internals[0].update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1); + progress.internals[0].update(T::minimum(), (event_streams.len() as i64) - 1); started = true; } diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index e626897d6..9f3803c48 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -1,11 +1,12 @@ //! Merges the contents of multiple streams. use crate::Container; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Stream, Scope}; /// Merge the contents of two streams. -pub trait Concat { +pub trait Concat { /// Merge the contents of two streams. /// /// # Examples @@ -21,17 +22,17 @@ pub trait Concat { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(self, other: Stream) -> Stream; + fn concat(self, other: Stream) -> Stream; } -impl Concat for Stream { - fn concat(self, other: Stream) -> Stream { +impl Concat for Stream { + fn concat(self, other: Stream) -> Stream { self.scope().concatenate([self, other]) } } /// Merge the contents of multiple streams. -pub trait Concatenate { +pub trait Concatenate { /// Merge the contents of multiple streams. /// /// # Examples @@ -49,15 +50,15 @@ pub trait Concatenate { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concatenate(&self, sources: I) -> Stream + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator>; + I: IntoIterator>; } -impl Concatenate for G { - fn concatenate(&self, sources: I) -> Stream +impl Concatenate for Scope { + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator> + I: IntoIterator> { // create an operator builder. diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index aae49ef17..49c1d70d2 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -32,10 +32,9 @@ use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::Message; use crate::worker::AsWorker; use crate::dataflow::{Stream, Scope}; -use crate::dataflow::scopes::Child; /// Extension trait to move a `Stream` into a child of its current `Scope`. -pub trait Enter, C> { +pub trait Enter, C> { /// Moves the `Stream` argument into a child of its current `Scope`. /// /// The destination scope must be a child of the stream's scope. @@ -43,7 +42,6 @@ pub trait Enter, C> { /// /// # Examples /// ``` - /// use timely::dataflow::scopes::Scope; /// use timely::dataflow::operators::{Enter, Leave, ToStream}; /// /// timely::example(|outer| { @@ -53,11 +51,16 @@ pub trait Enter, C> { /// }); /// }); /// ``` - fn enter(self, inner: &Child) -> Stream, C>; + fn enter(self, inner: &Scope) -> Stream; } -impl, C: Container> Enter for Stream { - fn enter(self, inner: &Child) -> Stream, C> { +impl Enter for Stream +where + TOuter: Timestamp, + TInner: Timestamp + Refines, + C: Container, +{ + fn enter(self, inner: &Scope) -> Stream { use crate::scheduling::Scheduler; @@ -73,7 +76,7 @@ impl, C: Container> Enter outer_addr, ); - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = Tee::::new(); let ingress = IngressNub { targets: Counter::new(targets), phantom: PhantomData, @@ -100,7 +103,7 @@ impl, C: Container> Enter } /// Extension trait to move a `Stream` to the parent of its current `Scope`. -pub trait Leave { +pub trait Leave { /// Moves a `Stream` to the parent of its current `Scope`. /// /// The parent scope must be supplied as an argument. @@ -110,7 +113,6 @@ pub trait Leave { /// /// # Examples /// ``` - /// use timely::dataflow::scopes::Scope; /// use timely::dataflow::operators::{Enter, Leave, ToStream}; /// /// timely::example(|outer| { @@ -120,11 +122,16 @@ pub trait Leave { /// }); /// }); /// ``` - fn leave(self, outer: &G) -> Stream; + fn leave(self, outer: &Scope) -> Stream; } -impl> Leave for Stream, C> { - fn leave(self, outer: &G) -> Stream { +impl Leave for Stream +where + TOuter: Timestamp, + TInner: Timestamp + Refines, + C: Container, +{ + fn leave(self, outer: &Scope) -> Stream { let scope = self.scope(); @@ -142,7 +149,7 @@ impl> Leave for let output = scope.subgraph.borrow_mut().new_output(); let target = Target::new(0, output.port); - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = Tee::::new(); let egress = EgressNub { targets, phantom: PhantomData }; let channel_id = scope.clone().new_identifier(); @@ -277,7 +284,6 @@ mod test { use crate::dataflow::{InputHandle, ProbeHandle}; use crate::dataflow::operators::{vec::Input, Inspect, Probe}; - use crate::dataflow::Scope; use crate::dataflow::operators::{Enter, Leave}; // initializes and runs a timely dataflow. diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index b980cdd86..96dcf3701 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -1,10 +1,11 @@ //! Exchange records between workers. use crate::Container; +use crate::progress::Timestamp; use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::ExchangeCore; use crate::dataflow::operators::generic::operator::Operator; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; /// Exchange records between workers. pub trait Exchange { @@ -29,7 +30,7 @@ pub trait Exchange { for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static; } -impl Exchange for Stream +impl Exchange for Stream where C: Container + SizableContainer @@ -38,7 +39,7 @@ where + crate::dataflow::channels::ContainerBytes + for<'a> PushInto>, { - fn exchange(self, route: F) -> Stream + fn exchange(self, route: F) -> Stream where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static, { diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 6b02c9c07..777f2f06a 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -10,7 +10,7 @@ use crate::progress::frontier::Antichain; use crate::progress::{Timestamp, PathSummary}; /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. -pub trait Feedback { +pub trait Feedback { /// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`. /// @@ -35,7 +35,7 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -65,12 +65,12 @@ pub trait LoopVariable { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle, C>, Stream, C>); + fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle, C>, Stream, C>); } -impl Feedback for G { +impl Feedback for Scope { - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); @@ -80,13 +80,13 @@ impl Feedback for G { } impl LoopVariable for Iterative { - fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle, C>, Stream, C>) { + fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle, C>, Stream, C>) { self.feedback(Product::new(Default::default(), summary)) } } /// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { +pub trait ConnectLoop { /// Connect a `Stream` to be the input of a loop variable. /// /// # Examples @@ -106,11 +106,11 @@ pub trait ConnectLoop { /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(self, handle: Handle); + fn connect_loop(self, handle: Handle); } -impl ConnectLoop for Stream { - fn connect_loop(self, handle: Handle) { +impl ConnectLoop for Stream { + fn connect_loop(self, handle: Handle) { let mut builder = handle.builder; let summary = handle.summary; @@ -133,8 +133,8 @@ impl ConnectLoop for Stream { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct Handle { - builder: OperatorBuilder, - summary: ::Summary, - output: crate::dataflow::channels::pushers::Output, +pub struct Handle { + builder: OperatorBuilder, + summary: ::Summary, + output: crate::dataflow::channels::pushers::Output, } diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index f291fbdf0..a6bc79321 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,8 +1,9 @@ //! Filters a stream by a predicate. use crate::container::{DrainContainer, SizableContainer, PushInto}; +use crate::progress::Timestamp; use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. @@ -23,11 +24,11 @@ pub trait Filter { fn filter)->bool+'static>(self, predicate: P) -> Self; } -impl Filter for Stream +impl Filter for Stream where for<'a> C: PushInto> { - fn filter)->bool+'static>(self, mut predicate: P) -> Stream { + fn filter)->bool+'static>(self, mut predicate: P) -> Stream { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { output.session(&time) diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index d6d495aeb..2ac81bc5c 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use std::cell::RefCell; use crate::container::{CapacityContainerBuilder, PushInto}; +use crate::scheduling::Scheduler; use crate::scheduling::{Schedule, Activator}; @@ -25,7 +26,10 @@ use crate::dataflow::channels::Message; // NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a. /// Create a new `Stream` and `Handle` through which to supply input. -pub trait Input : Scope { +pub trait Input { + /// The timestamp at which this input scope operates. + type Timestamp: Timestamp; + /// Create a new [Stream] and [Handle] through which to supply input. /// /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used @@ -59,7 +63,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, Stream); + fn new_input(&mut self) -> (Handle>, Stream); /// Create a new [Stream] and [Handle] through which to supply input. /// @@ -96,7 +100,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, Stream); + fn new_input_with_builder>(&mut self) -> (Handle, Stream); /// Create a new stream from a supplied interactive handle. /// @@ -129,25 +133,26 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> Stream; + fn input_from>(&mut self, handle: &mut Handle) -> Stream; } use crate::order::TotalOrder; -impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, Stream) { +impl Input for Scope { + type Timestamp = T; + fn new_input(&mut self) -> (Handle>, Stream) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, Stream) { + fn new_input_with_builder>(&mut self) -> (Handle, Stream) { let mut handle = Handle::new_with_builder(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> Stream { - let (output, registrar) = Tee::<::Timestamp, CB::Container>::new(); + fn input_from>(&mut self, handle: &mut Handle) -> Stream { + let (output, registrar) = Tee::::new(); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); @@ -332,10 +337,9 @@ impl> Handle { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> Stream + pub fn to_stream(&mut self, scope: &mut Scope) -> Stream where T: TotalOrder, - G: Scope, { scope.input_from(self) } diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index eb98ded73..7aa81c2ba 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,12 +1,13 @@ //! Extension trait and implementation for observing and action on streamed data. use crate::Container; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. -pub trait Inspect: InspectCore + Sized +pub trait Inspect: InspectCore + Sized where for<'a> &'a C: IntoIterator, { @@ -45,7 +46,7 @@ where /// ``` fn inspect_time(self, mut func: F) -> Self where - F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static, + F: for<'a> FnMut(&T, <&'a C as IntoIterator>::Item) + 'static, { self.inspect_batch(move |time, data| { for datum in data.into_iter() { @@ -66,7 +67,7 @@ where /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` - fn inspect_batch(self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self { + fn inspect_batch(self, mut func: impl FnMut(&T, &C)+'static) -> Self { self.inspect_core(move |event| { if let Ok((time, data)) = event { func(time, data); @@ -94,20 +95,20 @@ where /// }); /// }); /// ``` - fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static; } -impl Inspect for Stream +impl Inspect for Stream where for<'a> &'a C: IntoIterator, { - fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { + fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>) + 'static { self.inspect_container(func) } } /// Inspect containers -pub trait InspectCore { +pub trait InspectCore { /// Runs a supplied closure on each observed container, and each frontier advancement. /// /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data, @@ -128,16 +129,15 @@ pub trait InspectCore { /// }); /// }); /// ``` - fn inspect_container(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_container(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static; } -impl InspectCore for Stream { +impl InspectCore for Stream { fn inspect_container(self, mut func: F) -> Self - where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static + where F: FnMut(Result<(&T, &C), &[T]>)+'static { - use crate::progress::timestamp::Timestamp; - let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); + let mut frontier = crate::progress::Antichain::from_elem(T::minimum()); self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| { if chain.frontier() != frontier.borrow() { frontier.clear(); diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index df74965b8..24f6de08f 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,13 +1,14 @@ //! Extension methods for `Stream` based on record-by-record transformation. use crate::container::{DrainContainer, SizableContainer, PushInto}; +use crate::progress::Timestamp; use crate::Container; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for `Stream`. -pub trait Map : Sized { +pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -23,7 +24,7 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map(self, mut logic: L) -> Stream + fn map(self, mut logic: L) -> Stream where C2: Container + SizableContainer + PushInto, L: FnMut(C::Item<'_>)->D2 + 'static, @@ -45,7 +46,7 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_map(self, logic: L) -> Stream + fn flat_map(self, logic: L) -> Stream where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -88,11 +89,11 @@ pub trait Map : Sized { } } -impl Map for Stream { +impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_map(self, mut logic: L) -> Stream + fn flat_map(self, mut logic: L) -> Stream where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -109,26 +110,26 @@ impl Map for Stream { /// A stream wrapper that allows the accumulation of flatmap logic. -pub struct FlatMapBuilder +pub struct FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { - stream: T, + stream: St, logic: F, marker: std::marker::PhantomData, } -impl FlatMapBuilder +impl FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { /// Create a new wrapper with no action on the stream. - pub fn new(stream: T, logic: F) -> Self { + pub fn new(stream: St, logic: F) -> Self { FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } } /// Transform a flatmapped stream through additional logic. - pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { let logic = self.logic; FlatMapBuilder { stream: self.stream, @@ -137,11 +138,11 @@ where } } /// Convert the wrapper into a stream. - pub fn into_stream(self) -> Stream + pub fn into_stream(self) -> Stream where I: IntoIterator, - S: Scope, - T: Map, + T: Timestamp, + St: Map, C2: Container + SizableContainer + PushInto, { Map::flat_map(self.stream, self.logic) @@ -168,4 +169,4 @@ mod tests { assert_eq!((4..14).collect::>(), data.extract()[0].1); } -} \ No newline at end of file +} diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index c4e239756..a1d1ff08d 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,14 +1,15 @@ //! Operators that separate one stream into two streams based on some condition use crate::Container; +use crate::progress::Timestamp; use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OutputBuilder; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; /// Extension trait for `Stream`. -pub trait OkErr { +pub trait OkErr { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with the data. /// If it returns `Ok(x)`, then `x` will be sent @@ -33,7 +34,7 @@ pub trait OkErr { fn ok_err( self, logic: L, - ) -> (Stream, Stream) + ) -> (Stream, Stream) where C1: Container + SizableContainer + PushInto, C2: Container + SizableContainer + PushInto, @@ -41,11 +42,11 @@ pub trait OkErr { ; } -impl OkErr for Stream { +impl OkErr for Stream { fn ok_err( self, mut logic: L, - ) -> (Stream, Stream) + ) -> (Stream, Stream) where C1: Container + SizableContainer + PushInto, C2: Container + SizableContainer + PushInto, diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index c9acdb6c1..a58c41b74 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -2,13 +2,14 @@ use std::collections::BTreeMap; use crate::container::{DrainContainer, PushInto}; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::{Container, ContainerBuilder}; /// Partition a stream of records into multiple streams. -pub trait Partition { +pub trait Partition { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -28,14 +29,14 @@ pub trait Partition { /// } /// }); /// ``` - fn partition(self, parts: u64, route: F) -> Vec> + fn partition(self, parts: u64, route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; } -impl Partition for Stream { - fn partition(self, parts: u64, mut route: F) -> Vec> +impl Partition for Stream { + fn partition(self, parts: u64, mut route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index d78f467c4..c7288a2f7 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -11,12 +11,12 @@ use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::Stream; use crate::Container; use crate::dataflow::channels::Message; /// Monitors progress at a `Stream`. -pub trait Probe { +pub trait Probe { /// Constructs a progress probe which indicates which timestamps have elapsed at the operator. /// /// Returns a tuple of a probe handle and the input stream. @@ -46,7 +46,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe(self) -> (Handle, Self); + fn probe(self) -> (Handle, Self); /// Inserts a progress probe in a stream. /// @@ -78,18 +78,18 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(self, handle: &Handle) -> Stream; + fn probe_with(self, handle: &Handle) -> Stream; } -impl Probe for Stream { - fn probe(self) -> (Handle, Self) { +impl Probe for Stream { + fn probe(self) -> (Handle, Self) { // the frontier is shared state; scope updates, handle reads. - let handle = Handle::::new(); + let handle = Handle::::new(); let stream = self.probe_with(&handle); (handle, stream) } - fn probe_with(self, handle: &Handle) -> Stream { + fn probe_with(self, handle: &Handle) -> Stream { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -115,7 +115,7 @@ impl Probe for Stream { // At initialization, we have a few tasks. if !started { // We must discard the capability held by `OpereratorCore`. - progress.internals[0].update(G::Timestamp::minimum(), -1); + progress.internals[0].update(T::minimum(), -1); // We must retract the conservative hold in the shared handle. if let Some(shared_frontier) = shared_frontier.upgrade() { let mut borrow = shared_frontier.borrow_mut(); diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index 28818088d..0f1855f44 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -1,13 +1,14 @@ //! Shared containers use crate::dataflow::channels::pact::Pipeline; +use crate::progress::Timestamp; use crate::dataflow::operators::Operator; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::Stream; use crate::Container; use std::rc::Rc; /// Convert a stream into a stream of shared containers -pub trait SharedStream { +pub trait SharedStream { /// Convert a stream into a stream of shared data /// /// # Examples @@ -22,11 +23,11 @@ pub trait SharedStream { /// .inspect_container(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn shared(self) -> Stream>; + fn shared(self) -> Stream>; } -impl SharedStream for Stream { - fn shared(self) -> Stream> { +impl SharedStream for Stream { + fn shared(self) -> Stream> { self.unary(Pipeline, "Shared", move |_, _| { move |input, output| { input.for_each_time(|time, data| { diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 5e1747085..f02b51c71 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -1,13 +1,13 @@ //! Extension methods for `Stream` based on record-by-record transformation. use crate::Container; -use crate::order::PartialOrder; -use crate::dataflow::{Scope, Stream}; +use crate::progress::Timestamp; +use crate::dataflow::Stream; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for reclocking a stream. -pub trait Reclock { +pub trait Reclock { /// Delays records until an input is observed on the `clock` input. /// /// The source stream is buffered until a record is seen on the clock input, @@ -46,11 +46,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock(self, clock: Stream) -> Self; + fn reclock(self, clock: Stream) -> Self; } -impl Reclock for Stream { - fn reclock(self, clock: Stream) -> Stream { +impl Reclock for Stream { + fn reclock(self, clock: Stream) -> Stream { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 139f8a540..e27347169 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,6 +1,8 @@ //! Conversion to the `Stream` type from iterators. use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto}; +use crate::scheduling::Scheduler; +use crate::progress::Timestamp; use crate::{Container, ContainerBuilder}; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{Stream, Scope}; @@ -28,11 +30,11 @@ pub trait ToStreamBuilder { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_with_builder(self, scope: &mut S) -> Stream; + fn to_stream_with_builder(self, scope: &mut Scope) -> Stream; } impl ToStreamBuilder for I where CB: PushInto { - fn to_stream_with_builder(self, scope: &mut S) -> Stream { + fn to_stream_with_builder(self, scope: &mut Scope) -> Stream { source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { @@ -78,11 +80,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream(self, scope: &mut S) -> Stream; + fn to_stream(self, scope: &mut Scope) -> Stream; } impl ToStream for I where C: PushInto { - fn to_stream(self, scope: &mut S) -> Stream { + fn to_stream(self, scope: &mut Scope) -> Stream { ToStreamBuilder::>::to_stream_with_builder(self, scope) } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 6bfee175b..ddf45b5a4 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use std::cell::RefCell; use crate::ContainerBuilder; +use crate::scheduling::Scheduler; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -19,7 +20,7 @@ use crate::dataflow::{Scope, Stream}; use crate::scheduling::Activations; /// Create a new `Stream` and `Handle` through which to supply input. -pub trait UnorderedInput { +pub trait UnorderedInput { /// Create a new capability-based [Stream] and [UnorderedHandle] through which to supply input. This /// input supports multiple open epochs (timestamps) at the same time. /// @@ -74,16 +75,16 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); } -impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { +impl UnorderedInput for Scope { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { - let (output, registrar) = Tee::::new(); + let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); - let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal)); + let cap = Capability::new(T::minimum(), Rc::clone(&internal)); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); let counter = Output::new(counter, Rc::clone(&internal), 0); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 7839241bc..318fa9a4e 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -9,6 +9,8 @@ use std::rc::Rc; use std::cell::RefCell; use crate::scheduling::{Schedule, Activations}; +use crate::worker::AsWorker; +use crate::scheduling::Scheduler; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; @@ -50,19 +52,19 @@ impl OperatorShape { /// Builds operators with generic shape. #[derive(Debug)] -pub struct OperatorBuilder { - scope: G, +pub struct OperatorBuilder { + scope: Scope, index: usize, global: usize, address: Rc<[usize]>, // path to the operator (ending with index). shape: OperatorShape, - summary: Connectivity<::Summary>, + summary: Connectivity<::Summary>, } -impl OperatorBuilder { +impl OperatorBuilder { /// Allocates a new generic operator builder from its containing scope. - pub fn new(name: String, mut scope: G) -> Self { + pub fn new(name: String, mut scope: Scope) -> Self { let global = scope.new_identifier(); let index = scope.allocate_operator_index(); @@ -94,19 +96,19 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller where - P: ParallelizationContract + P: ParallelizationContract { let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> P::Puller + pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> P::Puller where - P: ParallelizationContract, - I: IntoIterator::Summary>)>, + P: ParallelizationContract, + I: IntoIterator::Summary>)>, { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); @@ -124,15 +126,15 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (Tee, Stream) { + pub fn new_output(&mut self) -> (Tee, Stream) { let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: I) -> (Tee, Stream) + pub fn new_output_connection(&mut self, connection: I) -> (Tee, Stream) where - I: IntoIterator::Summary>)>, + I: IntoIterator::Summary>)>, { let new_output = self.shape.outputs; self.shape.outputs += 1; @@ -150,7 +152,7 @@ impl OperatorBuilder { /// Creates an operator implementation from supplied logic constructor. pub fn build(mut self, logic: L) where - L: FnMut(&mut SharedProgress)->bool+'static + L: FnMut(&mut SharedProgress)->bool+'static { let inputs = self.shape.inputs; let outputs = self.shape.outputs; diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 2cfc39894..b286aa8d9 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -24,20 +24,20 @@ use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; /// Builds operators with generic shape. #[derive(Debug)] -pub struct OperatorBuilder { - builder: OperatorBuilderRaw, - frontier: Vec>, - consumed: Vec>>>, - internal: Rc>>>>>, +pub struct OperatorBuilder { + builder: OperatorBuilderRaw, + frontier: Vec>, + consumed: Vec>>>, + internal: Rc>>>>>, /// For each input, a shared list of summaries to each output. - summaries: Vec::Summary>>>>, - produced: Vec>>>, + summaries: Vec::Summary>>>>, + produced: Vec>>>, } -impl OperatorBuilder { +impl OperatorBuilder { /// Allocates a new generic operator builder from its containing scope. - pub fn new(name: String, scope: G) -> Self { + pub fn new(name: String, scope: Scope) -> Self { OperatorBuilder { builder: OperatorBuilderRaw::new(name, scope), frontier: Vec::new(), @@ -54,9 +54,9 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore + pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore where - P: ParallelizationContract { + P: ParallelizationContract { let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) @@ -70,10 +70,10 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> InputHandleCore + pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> InputHandleCore where - P: ParallelizationContract, - I: IntoIterator::Summary>)> + Clone, + P: ParallelizationContract, + I: IntoIterator::Summary>)> + Clone, { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); @@ -88,7 +88,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (pushers::Output, Stream) { + pub fn new_output(&mut self) -> (pushers::Output, Stream) { let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -102,11 +102,11 @@ impl OperatorBuilder { /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. pub fn new_output_connection(&mut self, connection: I) -> ( - pushers::Output, - Stream, + pushers::Output, + Stream, ) where - I: IntoIterator::Summary>)> + Clone, + I: IntoIterator::Summary>)> + Clone, { let new_output = self.shape().outputs(); let (tee, stream) = self.builder.new_output_connection(connection.clone()); @@ -127,8 +127,8 @@ impl OperatorBuilder { /// Creates an operator implementation from supplied logic constructor. pub fn build(self, constructor: B) where - B: FnOnce(Vec>) -> L, - L: FnMut(&[MutableAntichain])+'static + B: FnOnce(Vec>) -> L, + L: FnMut(&[MutableAntichain])+'static { self.build_reschedule(|caps| { let mut logic = constructor(caps); @@ -144,13 +144,13 @@ impl OperatorBuilder { /// discretion. pub fn build_reschedule(self, constructor: B) where - B: FnOnce(Vec>) -> L, - L: FnMut(&[MutableAntichain])->bool+'static + B: FnOnce(Vec>) -> L, + L: FnMut(&[MutableAntichain])->bool+'static { // create capabilities, discard references to their creation. let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); for batch in self.internal.borrow().iter() { - capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch))); + capabilities.push(Capability::new(T::minimum(), Rc::clone(batch))); // Discard evidence of creation, as we are assumed to start with one. batch.borrow_mut().clear(); } @@ -163,7 +163,7 @@ impl OperatorBuilder { let self_produced = self.produced; let raw_logic = - move |progress: &mut SharedProgress| { + move |progress: &mut SharedProgress| { // drain frontier changes for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index c7fa28b15..f695c9683 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -2,6 +2,7 @@ //! Methods to construct generic streaming and blocking unary operators. use crate::progress::frontier::MutableAntichain; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder}; @@ -16,7 +17,7 @@ use crate::{Container, ContainerBuilder}; use crate::container::CapacityContainerBuilder; /// Methods to construct generic streaming and blocking operators. -pub trait Operator { +pub trait Operator { /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input. @@ -55,13 +56,13 @@ pub trait Operator { /// .container::>(); /// }); /// ``` - fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((&mut InputHandleCore, &MutableAntichain), - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P: ParallelizationContract; + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut((&mut InputHandleCore, &MutableAntichain), + &mut OutputBuilderSession<'_, T, CB>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`, /// and repeatedly invokes the closure supplied as `logic`, which can read from the input stream, write to @@ -90,11 +91,11 @@ pub trait Operator { /// }); /// ``` fn unary_notify, - &mut OutputBuilderSession<'_, G::Timestamp, CB>, - &mut Notificator)+'static, - P: ParallelizationContract> - (self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> Stream; + L: FnMut(&mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>, + &mut Notificator)+'static, + P: ParallelizationContract> + (self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -124,13 +125,13 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(self, pact: P, name: &str, constructor: B) -> Stream + fn unary(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputBuilderSession)+'static, - P: ParallelizationContract; + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut(&mut InputHandleCore, + &mut OutputBuilderSession)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -180,16 +181,16 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((&mut InputHandleCore, &MutableAntichain), - (&mut InputHandleCore, &MutableAntichain), - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut((&mut InputHandleCore, &MutableAntichain), + (&mut InputHandleCore, &MutableAntichain), + &mut OutputBuilderSession<'_, T, CB>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`, /// and repeatedly invokes the closure supplied as `logic`, which can read from the input streams, write to @@ -234,13 +235,13 @@ pub trait Operator { /// ``` fn binary_notify, - &mut InputHandleCore, - &mut OutputBuilderSession<'_, G::Timestamp, CB>, - &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> Stream; + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>, + &mut Notificator)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract> + (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -269,16 +270,16 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream @@ -306,19 +307,19 @@ pub trait Operator { /// ``` fn sink(self, pact: P, name: &str, logic: L) where - L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, - P: ParallelizationContract; + L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, + P: ParallelizationContract; } -impl Operator for Stream { +impl Operator for Stream { - fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((&mut InputHandleCore, &MutableAntichain), - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P: ParallelizationContract { + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut((&mut InputHandleCore, &MutableAntichain), + &mut OutputBuilderSession<'_, T, CB>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -341,11 +342,11 @@ impl Operator for Stream { } fn unary_notify, - &mut OutputBuilderSession<'_, G::Timestamp, CB>, - &mut Notificator)+'static, - P: ParallelizationContract> - (self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { + L: FnMut(&mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>, + &mut Notificator)+'static, + P: ParallelizationContract> + (self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -361,13 +362,13 @@ impl Operator for Stream { }) } - fn unary(self, pact: P, name: &str, constructor: B) -> Stream + fn unary(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P: ParallelizationContract { + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut(&mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -387,16 +388,16 @@ impl Operator for Stream { stream } - fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((&mut InputHandleCore, &MutableAntichain), - (&mut InputHandleCore, &MutableAntichain), - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut((&mut InputHandleCore, &MutableAntichain), + (&mut InputHandleCore, &MutableAntichain), + &mut OutputBuilderSession<'_, T, CB>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -421,13 +422,13 @@ impl Operator for Stream { fn binary_notify, - &mut InputHandleCore, - &mut OutputBuilderSession<'_, G::Timestamp, CB>, - &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>, + &mut Notificator)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract> + (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -445,16 +446,16 @@ impl Operator for Stream { } - fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputBuilderSession<'_, T, CB>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -481,8 +482,8 @@ impl Operator for Stream { fn sink(self, pact: P, name: &str, mut logic: L) where - L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, - P: ParallelizationContract { + L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let mut input = builder.new_input(self, pact); @@ -537,11 +538,11 @@ impl Operator for Stream { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> Stream +pub fn source(scope: &Scope, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, - B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static { + B: FnOnce(Capability, OperatorInfo) -> L, + L: FnMut(&mut OutputBuilderSession<'_, T, CB>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); @@ -581,7 +582,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> Stream { +pub fn empty(scope: &Scope) -> Stream { source::<_, CapacityContainerBuilder, _, _>(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/vec/aggregation/aggregate.rs b/timely/src/dataflow/operators/vec/aggregation/aggregate.rs index bac0a8905..3b302e8b9 100644 --- a/timely/src/dataflow/operators/vec/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/vec/aggregation/aggregate.rs @@ -3,7 +3,8 @@ use std::hash::Hash; use std::collections::HashMap; use crate::ExchangeData; -use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -11,7 +12,7 @@ use crate::dataflow::channels::pact::Exchange; /// /// Extension method supporting aggregation of keyed data within timestamp. /// For inter-timestamp aggregation, consider `StateMachine`. -pub trait Aggregate { +pub trait Aggregate { /// Aggregates data of the form `(key, val)`, using user-supplied logic. /// /// The `aggregate` method is implemented for streams of `(K, V)` data, @@ -65,16 +66,16 @@ pub trait Aggregate { self, fold: F, emit: E, - hash: H) -> StreamVec where S::Timestamp: Eq; + hash: H) -> StreamVec where T: Eq; } -impl, K: ExchangeData+Clone+Hash+Eq, V: ExchangeData> Aggregate for StreamVec { +impl Aggregate for StreamVec { fn aggregateR+'static, H: Fn(&K)->u64+'static>( self, fold: F, emit: E, - hash: H) -> StreamVec where S::Timestamp: Eq { + hash: H) -> StreamVec where T: Eq { let mut aggregates = HashMap::new(); self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/vec/aggregation/state_machine.rs b/timely/src/dataflow/operators/vec/aggregation/state_machine.rs index f1f2a669f..3904b832d 100644 --- a/timely/src/dataflow/operators/vec/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/vec/aggregation/state_machine.rs @@ -3,7 +3,8 @@ use std::hash::Hash; use std::collections::HashMap; use crate::ExchangeData; -use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -17,7 +18,7 @@ use crate::dataflow::channels::pact::Exchange; /// updates for the current time reflected in the notificator, though. In the case of partially /// ordered times, the only guarantee is that updates are not applied out of order, not that there /// is some total order on times respecting the total order (updates may be interleaved). -pub trait StateMachine { +pub trait StateMachine { /// Tracks a state for each presented key, using user-supplied state transition logic. /// /// The transition logic `fold` may mutate the state, and produce both output records and @@ -51,17 +52,17 @@ pub trait StateMachine { I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(self, fold: F, hash: H) -> StreamVec where S::Timestamp : Hash+Eq ; + >(self, fold: F, hash: H) -> StreamVec where T : Hash+Eq ; } -impl StateMachine for StreamVec { +impl StateMachine for StreamVec { fn state_machine< R: 'static, // output type D: Default+'static, // per-key state (data) I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(self, fold: F, hash: H) -> StreamVec where S::Timestamp : Hash+Eq { + >(self, fold: F, hash: H) -> StreamVec where T : Hash+Eq { let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state) let mut states = HashMap::new(); // keys -> state diff --git a/timely/src/dataflow/operators/vec/branch.rs b/timely/src/dataflow/operators/vec/branch.rs index ce6ff785f..af280cbb2 100644 --- a/timely/src/dataflow/operators/vec/branch.rs +++ b/timely/src/dataflow/operators/vec/branch.rs @@ -1,13 +1,14 @@ //! Operators that separate one stream into two streams based on some condition use crate::dataflow::channels::pact::Pipeline; +use crate::progress::Timestamp; use crate::dataflow::operators::generic::OutputBuilder; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, StreamVec, Stream}; +use crate::dataflow::{StreamVec, Stream}; use crate::Container; /// Extension trait for `StreamVec`. -pub trait Branch { +pub trait Branch { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with a reference to /// the data and its time. If it returns `true`, the record will be sent @@ -31,15 +32,15 @@ pub trait Branch { /// ``` fn branch( self, - condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (StreamVec, StreamVec); + condition: impl Fn(&T, &D) -> bool + 'static, + ) -> (StreamVec, StreamVec); } -impl Branch for StreamVec { +impl Branch for StreamVec { fn branch( self, - condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (StreamVec, StreamVec) { + condition: impl Fn(&T, &D) -> bool + 'static, + ) -> (StreamVec, StreamVec) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); @@ -99,8 +100,8 @@ pub trait BranchWhen: Sized { fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for Stream { - fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { +impl BranchWhen for Stream { + fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); diff --git a/timely/src/dataflow/operators/vec/broadcast.rs b/timely/src/dataflow/operators/vec/broadcast.rs index 8bf4397ca..e2372c7aa 100644 --- a/timely/src/dataflow/operators/vec/broadcast.rs +++ b/timely/src/dataflow/operators/vec/broadcast.rs @@ -1,7 +1,8 @@ //! Broadcast records to all workers. use crate::ExchangeData; -use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; use crate::dataflow::operators::{vec::Map, Exchange}; /// Broadcast records to all workers. @@ -21,8 +22,8 @@ pub trait Broadcast { fn broadcast(self) -> Self; } -impl Broadcast for StreamVec { - fn broadcast(self) -> StreamVec { +impl Broadcast for StreamVec { + fn broadcast(self) -> StreamVec { // NOTE: Simplified implementation due to underlying motion // in timely dataflow internals. Optimize once they have diff --git a/timely/src/dataflow/operators/vec/count.rs b/timely/src/dataflow/operators/vec/count.rs index 24a1363c8..d8fb67822 100644 --- a/timely/src/dataflow/operators/vec/count.rs +++ b/timely/src/dataflow/operators/vec/count.rs @@ -2,11 +2,12 @@ use std::collections::HashMap; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; use crate::dataflow::operators::generic::operator::Operator; /// Accumulates records within a timestamp. -pub trait Accumulate : Sized { +pub trait Accumulate : Sized { /// Accumulates records within a timestamp. /// /// # Examples @@ -25,7 +26,7 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec; + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec; /// Counts the number of records observed at each time. /// /// # Examples @@ -44,13 +45,13 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![10])]); /// ``` - fn count(self) -> StreamVec { + fn count(self) -> StreamVec { self.accumulate(0, |sum, data| *sum += data.len()) } } -impl, D: 'static> Accumulate for StreamVec { - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec { +impl Accumulate for StreamVec { + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/vec/delay.rs b/timely/src/dataflow/operators/vec/delay.rs index 3ff0596f5..4a95cdafb 100644 --- a/timely/src/dataflow/operators/vec/delay.rs +++ b/timely/src/dataflow/operators/vec/delay.rs @@ -2,13 +2,14 @@ use std::collections::HashMap; -use crate::order::{PartialOrder, TotalOrder}; +use crate::order::TotalOrder; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamVec, Scope}; +use crate::dataflow::StreamVec; use crate::dataflow::operators::generic::operator::Operator; /// Methods to advance the timestamps of records or batches of records. -pub trait Delay { +pub trait Delay { /// Advances the timestamp of records using a supplied function. /// @@ -36,7 +37,7 @@ pub trait Delay { /// }); /// }); /// ``` - fn delayG::Timestamp+'static>(self, func: L) -> Self; + fn delayT+'static>(self, func: L) -> Self; /// Advances the timestamp of records using a supplied function. /// @@ -64,8 +65,8 @@ pub trait Delay { /// }); /// }); /// ``` - fn delay_totalG::Timestamp+'static>(self, func: L) -> Self - where G::Timestamp: TotalOrder; + fn delay_totalT+'static>(self, func: L) -> Self + where T: TotalOrder; /// Advances the timestamp of batches of records using a supplied function. /// @@ -93,11 +94,11 @@ pub trait Delay { /// }); /// }); /// ``` - fn delay_batchG::Timestamp+'static>(self, func: L) -> Self; + fn delay_batchT+'static>(self, func: L) -> Self; } -impl, D: 'static> Delay for StreamVec { - fn delayG::Timestamp+'static>(self, mut func: L) -> Self { +impl Delay for StreamVec { + fn delayT+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { input.for_each_time(|time, data| { @@ -119,13 +120,13 @@ impl, D: 'static> Delay for StreamV }) } - fn delay_totalG::Timestamp+'static>(self, func: L) -> Self - where G::Timestamp: TotalOrder + fn delay_totalT+'static>(self, func: L) -> Self + where T: TotalOrder { self.delay(func) } - fn delay_batchG::Timestamp+'static>(self, mut func: L) -> Self { + fn delay_batchT+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { input.for_each_time(|time, data| { diff --git a/timely/src/dataflow/operators/vec/filter.rs b/timely/src/dataflow/operators/vec/filter.rs index 091e9dc9d..cb4bb308d 100644 --- a/timely/src/dataflow/operators/vec/filter.rs +++ b/timely/src/dataflow/operators/vec/filter.rs @@ -1,7 +1,8 @@ //! Filters a stream by a predicate. use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. @@ -22,8 +23,8 @@ pub trait Filter { fn filterbool+'static>(self, predicate: P) -> Self; } -impl Filter for StreamVec { - fn filterbool+'static>(self, mut predicate: P) -> StreamVec { +impl Filter for StreamVec { + fn filterbool+'static>(self, mut predicate: P) -> StreamVec { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); diff --git a/timely/src/dataflow/operators/vec/flow_controlled.rs b/timely/src/dataflow/operators/vec/flow_controlled.rs index 5274406f3..55b2b8fc4 100644 --- a/timely/src/dataflow/operators/vec/flow_controlled.rs +++ b/timely/src/dataflow/operators/vec/flow_controlled.rs @@ -1,6 +1,7 @@ //! Methods to construct flow-controlled sources. -use crate::order::{PartialOrder, TotalOrder}; +use crate::order::TotalOrder; +use crate::scheduling::Scheduler; use crate::progress::timestamp::Timestamp; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::probe::Handle; @@ -69,18 +70,18 @@ pub struct IteratorSourceInput, I /// }).unwrap(); /// ``` pub fn iterator_source< - G: Scope, + T: Timestamp, D: 'static, DI: IntoIterator, - I: IntoIterator, - F: FnMut(&G::Timestamp)->Option>+'static>( - scope: &G, + I: IntoIterator, + F: FnMut(&T)->Option>+'static>( + scope: &Scope, name: &str, mut input_f: F, - probe: Handle, - ) -> StreamVec where G::Timestamp: TotalOrder { + probe: Handle, + ) -> StreamVec where T: TotalOrder { - let mut target = G::Timestamp::minimum(); + let mut target = T::minimum(); source(scope, name, |cap, info| { let mut cap = Some(cap); let activator = scope.activator_for(info.address); diff --git a/timely/src/dataflow/operators/vec/input.rs b/timely/src/dataflow/operators/vec/input.rs index 70ccf7466..1e459e26f 100644 --- a/timely/src/dataflow/operators/vec/input.rs +++ b/timely/src/dataflow/operators/vec/input.rs @@ -1,6 +1,7 @@ //! Create new `Streams` connected to external inputs. use crate::container::CapacityContainerBuilder; +use crate::progress::Timestamp; use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::core::{Input as InputCore}; @@ -12,7 +13,10 @@ use crate::dataflow::operators::core::{Input as InputCore}; // NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a. /// Create a new `StreamVec` and `Handle` through which to supply input. -pub trait Input : Scope { +pub trait Input { + /// The timestamp at which this input scope operates. + type Timestamp: Timestamp; + /// Create a new `StreamVec` and `Handle` through which to supply input. /// /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used @@ -46,7 +50,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamVec); + fn new_input(&mut self) -> (Handle, StreamVec); /// Create a new stream from a supplied interactive handle. /// @@ -79,16 +83,17 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamVec; + fn input_from(&mut self, handle: &mut Handle) -> StreamVec; } use crate::order::TotalOrder; -impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamVec) { +impl Input for Scope { + type Timestamp = T; + fn new_input(&mut self) -> (Handle, StreamVec) { InputCore::new_input(self) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamVec { + fn input_from(&mut self, handle: &mut Handle) -> StreamVec { InputCore::input_from(self, handle) } } diff --git a/timely/src/dataflow/operators/vec/map.rs b/timely/src/dataflow/operators/vec/map.rs index 66b4b8b2a..e7b4ef320 100644 --- a/timely/src/dataflow/operators/vec/map.rs +++ b/timely/src/dataflow/operators/vec/map.rs @@ -1,12 +1,13 @@ //! Extension methods for `StreamVec` based on record-by-record transformation. -use crate::dataflow::{StreamVec, Scope}; +use crate::dataflow::StreamVec; +use crate::progress::Timestamp; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::operators::core::{Map as MapCore}; /// Extension trait for `StreamVec`. -pub trait Map : Sized { +pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -19,7 +20,7 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(self, mut logic: L) -> StreamVec { + fn mapD2+'static>(self, mut logic: L) -> StreamVec { self.flat_map(move |x| std::iter::once(logic(x))) } /// Updates each element of the stream and yields the element, re-using memory where possible. @@ -34,7 +35,7 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_in_place(self, logic: L) -> StreamVec; + fn map_in_place(self, logic: L) -> StreamVec; /// Consumes each element of the stream and yields some number of new elements. /// /// # Examples @@ -47,11 +48,11 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static; + fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static; } -impl Map for StreamVec { - fn map_in_place(self, mut logic: L) -> StreamVec { +impl Map for StreamVec { + fn map_in_place(self, mut logic: L) -> StreamVec { self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); @@ -65,7 +66,7 @@ impl Map for StreamVec { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static { + fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static { MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/vec/partition.rs b/timely/src/dataflow/operators/vec/partition.rs index 3589cd71f..1a08d9b11 100644 --- a/timely/src/dataflow/operators/vec/partition.rs +++ b/timely/src/dataflow/operators/vec/partition.rs @@ -1,11 +1,12 @@ //! Partition a stream of records into multiple streams. use crate::container::CapacityContainerBuilder; +use crate::progress::Timestamp; use crate::dataflow::operators::core::Partition as PartitionCore; -use crate::dataflow::{Scope, StreamVec}; +use crate::dataflow::StreamVec; /// Partition a stream of records into multiple streams. -pub trait Partition { +pub trait Partition { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -21,11 +22,11 @@ pub trait Partition { /// streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x)); /// }); /// ``` - fn partition (u64, D2)+'static>(self, parts: u64, route: F) -> Vec>; + fn partition (u64, D2)+'static>(self, parts: u64, route: F) -> Vec>; } -impl Partition for StreamVec { - fn partition(u64, D2)+'static>(self, parts: u64, route: F) -> Vec> { +impl Partition for StreamVec { + fn partition(u64, D2)+'static>(self, parts: u64, route: F) -> Vec> { PartitionCore::partition::, _, _>(self, parts, route) } } diff --git a/timely/src/dataflow/operators/vec/queue.rs b/timely/src/dataflow/operators/vec/queue.rs index fa1395591..a450e7bbd 100644 --- a/timely/src/dataflow/operators/vec/queue.rs +++ b/timely/src/dataflow/operators/vec/queue.rs @@ -9,8 +9,8 @@ pub trait Queue { fn queue(&self) -> Self; } -impl Queue for StreamVec { - fn queue(&self) -> StreamVec { +impl Queue for StreamVec { + fn queue(&self) -> StreamVec { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Queue", vec![], move |input, output, notificator| { while let Some((time, data)) = input.next() { diff --git a/timely/src/dataflow/operators/vec/result.rs b/timely/src/dataflow/operators/vec/result.rs index cecff9223..50ab57f8f 100644 --- a/timely/src/dataflow/operators/vec/result.rs +++ b/timely/src/dataflow/operators/vec/result.rs @@ -1,10 +1,11 @@ //! Extension methods for `StreamVec` containing `Result`s. use crate::dataflow::operators::vec::Map; -use crate::dataflow::{Scope, StreamVec}; +use crate::progress::Timestamp; +use crate::dataflow::StreamVec; /// Extension trait for `StreamVec`. -pub trait ResultStream { +pub trait ResultStream { /// Returns a new instance of `self` containing only `ok` records. /// /// # Examples @@ -17,7 +18,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn ok(self) -> StreamVec; + fn ok(self) -> StreamVec; /// Returns a new instance of `self` containing only `err` records. /// @@ -31,7 +32,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn err(self) -> StreamVec; + fn err(self) -> StreamVec; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// @@ -45,7 +46,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_ok T2 + 'static>(self, logic: L) -> StreamVec>; + fn map_ok D2 + 'static>(self, logic: L) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Err` records. /// @@ -59,7 +60,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_err E2 + 'static>(self, logic: L) -> StreamVec>; + fn map_err E2 + 'static>(self, logic: L) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` /// records. @@ -74,10 +75,10 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn and_then Result + 'static>( + fn and_then Result + 'static>( self, logic: L, - ) -> StreamVec>; + ) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// @@ -91,31 +92,31 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn unwrap_or_else T + 'static>(self, logic: L) -> StreamVec; + fn unwrap_or_else D + 'static>(self, logic: L) -> StreamVec; } -impl ResultStream for StreamVec> { - fn ok(self) -> StreamVec { +impl ResultStream for StreamVec> { + fn ok(self) -> StreamVec { self.flat_map(Result::ok) } - fn err(self) -> StreamVec { + fn err(self) -> StreamVec { self.flat_map(Result::err) } - fn map_ok T2 + 'static>(self, mut logic: L) -> StreamVec> { + fn map_ok D2 + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.map(&mut logic)) } - fn map_err E2 + 'static>(self, mut logic: L) -> StreamVec> { + fn map_err E2 + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.map_err(&mut logic)) } - fn and_then Result + 'static>(self, mut logic: L) -> StreamVec> { + fn and_then Result + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.and_then(&mut logic)) } - fn unwrap_or_else T + 'static>(self, mut logic: L) -> StreamVec { + fn unwrap_or_else D + 'static>(self, mut logic: L) -> StreamVec { self.map(move |r| r.unwrap_or_else(&mut logic)) } } diff --git a/timely/src/dataflow/operators/vec/to_stream.rs b/timely/src/dataflow/operators/vec/to_stream.rs index fba5898c6..267641688 100644 --- a/timely/src/dataflow/operators/vec/to_stream.rs +++ b/timely/src/dataflow/operators/vec/to_stream.rs @@ -1,6 +1,7 @@ //! Conversion to the `StreamVec` type from iterators. use crate::dataflow::{StreamVec, Scope}; +use crate::progress::Timestamp; use crate::dataflow::operators::core::{ToStream as ToStreamCore}; /// Converts to a timely `StreamVec`. @@ -21,11 +22,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream(self, scope: &mut S) -> StreamVec; + fn to_stream(self, scope: &mut Scope) -> StreamVec; } impl ToStream for I { - fn to_stream(self, scope: &mut S) -> StreamVec { + fn to_stream(self, scope: &mut Scope) -> StreamVec { ToStreamCore::to_stream(self, scope) } } diff --git a/timely/src/dataflow/operators/vec/unordered_input.rs b/timely/src/dataflow/operators/vec/unordered_input.rs index 63f195ede..297f20825 100644 --- a/timely/src/dataflow/operators/vec/unordered_input.rs +++ b/timely/src/dataflow/operators/vec/unordered_input.rs @@ -1,12 +1,13 @@ //! Create new `Streams` connected to external inputs. use crate::container::CapacityContainerBuilder; +use crate::progress::Timestamp; use crate::dataflow::operators::{ActivateCapability}; use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; use crate::dataflow::{StreamVec, Scope}; /// Create a new `StreamVec` and `Handle` through which to supply input. -pub trait UnorderedInput { +pub trait UnorderedInput { /// Create a new capability-based `StreamVec` and `Handle` through which to supply input. This /// input supports multiple open epochs (timestamps) at the same time. /// @@ -60,12 +61,12 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec); } -impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec) { +impl UnorderedInput for Scope { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec) { UnorderedInputCore::new_unordered_input(self) } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 5d6773d68..c5b01acd9 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -1,4 +1,4 @@ -//! A child dataflow scope, used to build nested dataflow scopes. +//! A dataflow scope, used to build dataflow graphs. use std::rc::Rc; use std::cell::RefCell; @@ -15,17 +15,19 @@ use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::worker::{AsWorker, Config, Worker}; -use super::Scope; +/// Type alias for an iterative scope. +pub type Iterative = Scope>; -/// Type alias for iterative child scope. -pub type Iterative = Child>; - -/// A `Child` wraps a `Subgraph` and manages the addition +/// A `Scope` wraps a `SubgraphBuilder` and manages the addition /// of `Operate`s and the connection of edges between them. -pub struct Child { +/// +/// Importantly, this is a *shared* object, backed by `Rc>` wrappers. Each method +/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the +/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics. +pub struct Scope { /// The subgraph under assembly. /// - /// Stored as `Rc>` so that multiple `Child` clones can share the + /// Stored as `Rc>` so that multiple `Scope` clones can share the /// same subgraph state during construction. The owning `scoped` / `region` / /// `dataflow` call recovers the inner `SubgraphBuilder` via `Rc::try_unwrap` /// when the closure returns; if a clone has escaped the closure, this fails @@ -39,36 +41,21 @@ pub struct Child { pub(crate) progress_logging: Option>, } -impl Child { +impl Scope { /// This worker's index out of `0 .. self.peers()`. pub fn index(&self) -> usize { self.worker.index() } /// The total number of workers in the computation. pub fn peers(&self) -> usize { self.worker.peers() } -} -impl AsWorker for Child { - fn config(&self) -> &Config { self.worker.config() } - fn index(&self) -> usize { self.worker.index() } - fn peers(&self) -> usize { self.worker.peers() } - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { self.worker.allocate(identifier, address) } - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { self.worker.pipeline(identifier, address) } - fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { self.worker.broadcast(identifier, address) } - fn new_identifier(&mut self) -> usize { self.worker.new_identifier() } - fn peek_identifier(&self) -> usize { self.worker.peek_identifier() } - fn log_register(&self) -> Option<::std::cell::RefMut<'_, crate::logging_core::Registry>> { self.worker.log_register() } -} - -impl Scheduler for Child { - fn activations(&self) -> Rc> { self.worker.activations() } -} + /// A useful name describing the scope. + pub fn name(&self) -> String { self.subgraph.borrow().name.clone() } -impl Scope for Child { - type Timestamp = T; + /// A sequence of scope identifiers describing the path from the worker root to this scope. + pub fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) } - fn name(&self) -> String { self.subgraph.borrow().name.clone() } - fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) } - - fn addr_for_child(&self, index: usize) -> Rc<[usize]> { + /// A sequence of scope identifiers describing the path from the worker root to the child + /// indicated by `index`. + pub fn addr_for_child(&self, index: usize) -> Rc<[usize]> { let path = &self.subgraph.borrow().path[..]; let mut addr = Vec::with_capacity(path.len() + 1); addr.extend_from_slice(path); @@ -76,23 +63,74 @@ impl Scope for Child { addr.into() } - fn add_edge(&self, source: Source, target: Target) { + /// Connects a source of data with a target of the data. This only links the two for + /// the purposes of tracking progress, rather than effect any data movement itself. + pub fn add_edge(&self, source: Source, target: Target) { self.subgraph.borrow_mut().connect(source, target); } - fn add_operator_with_indices(&mut self, operator: Box>, local: usize, global: usize) { - self.subgraph.borrow_mut().add_child(operator, local, global); + /// Adds a child `Operate` to this scope. Returns the new child's index. + pub fn add_operator(&mut self, operator: Box>) -> usize { + let index = self.allocate_operator_index(); + let global = self.new_identifier(); + self.add_operator_with_indices(operator, index, global); + index } - fn allocate_operator_index(&mut self) -> usize { + /// Allocates a new scope-local operator index. + /// + /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local + /// operator index allocated with this method. This method does cause the scope to expect that + /// an operator will be added, and it is an error not to eventually add such an operator. + pub fn allocate_operator_index(&mut self) -> usize { self.subgraph.borrow_mut().allocate_child_id() } + /// Adds a child `Operate` to this scope using a supplied index. + /// + /// This is used internally when there is a gap between allocate a child identifier and adding the + /// child, as happens in subgraph creation. + pub fn add_operator_with_index(&mut self, operator: Box>, index: usize) { + let global = self.new_identifier(); + self.add_operator_with_indices(operator, index, global); + } + + /// Adds a child `Operate` to this scope using supplied indices. + /// + /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging. + pub fn add_operator_with_indices(&mut self, operator: Box>, local: usize, global: usize) { + self.subgraph.borrow_mut().add_child(operator, local, global); + } + + /// Creates a dataflow subgraph. + /// + /// This method allows the user to create a nested scope with any timestamp that + /// "refines" the enclosing timestamp (informally: extends it in a reversible way). + /// + /// This is most commonly used to create new iterative contexts, and the provided + /// method `iterative` for this task demonstrates the use of this method. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// use timely::order::Product; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::>(); + /// let output = child1.scoped::,_,_>("ScopeName", |child2| { + /// stream.enter(child2).leave(child1) + /// }); + /// input + /// }); + /// }); + /// ``` #[inline] - fn scoped(&self, name: &str, func: F) -> R + pub fn scoped(&self, name: &str, func: F) -> R where - T2: Timestamp+Refines, - F: FnOnce(&mut Child) -> R, + T2: Timestamp + Refines, + F: FnOnce(&mut Scope) -> R, { let mut scope = self.clone(); let index = scope.subgraph.borrow_mut().allocate_child_id(); @@ -105,7 +143,7 @@ impl Scope for Child { let subscope = Rc::new(RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name))); let result = { - let mut builder = Child { + let mut builder = Scope { subgraph: Rc::clone(&subscope), worker: scope.worker.clone(), logging: scope.logging.clone(), @@ -116,8 +154,8 @@ impl Scope for Child { let subscope = Rc::try_unwrap(subscope) .map_err(|_| ()) .expect( - "Cannot consume scope: an outstanding `Child` clone is still alive. \ - This usually means a `Child` was cloned and held past the scoped() / \ + "Cannot consume scope: an outstanding `Scope` clone is still alive. \ + This usually means a `Scope` was cloned and held past the scoped() / \ region() / iterative() / dataflow() call that constructed it." ) .into_inner() @@ -127,11 +165,115 @@ impl Scope for Child { result } + + /// Creates an iterative dataflow subgraph. + /// + /// This method is a specialization of `scoped` which uses the `Product` timestamp + /// combinator, suitable for iterative computations in which iterative development + /// at some time cannot influence prior iterations at a future time. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::>(); + /// let output = child1.iterative::(|child2| { + /// stream.enter(child2).leave(child1) + /// }); + /// input + /// }); + /// }); + /// ``` + pub fn iterative(&self, func: F) -> R + where + T2: Timestamp, + F: FnOnce(&mut Scope>) -> R, + { + self.scoped::, R, F>("Iterative", func) + } + + /// Creates a dataflow region with the same timestamp. + /// + /// This method is a specialization of `scoped` which uses the same timestamp as the + /// containing scope. It is used mainly to group regions of a dataflow computation, and + /// provides some computational benefits by abstracting the specifics of the region. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::>(); + /// let output = child1.region(|child2| { + /// stream.enter(child2).leave(child1) + /// }); + /// input + /// }); + /// }); + /// ``` + pub fn region(&self, func: F) -> R + where + F: FnOnce(&mut Scope) -> R, + { + self.region_named("Region", func) + } + + /// Creates a dataflow region with the same timestamp and a supplied name. + /// + /// This method is a specialization of `scoped` which uses the same timestamp as the + /// containing scope. It is used mainly to group regions of a dataflow computation, and + /// provides some computational benefits by abstracting the specifics of the region. + /// + /// This variant allows you to specify a name for the region, which can be read out in + /// the timely logging streams. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::>(); + /// let output = child1.region_named("region", |child2| { + /// stream.enter(child2).leave(child1) + /// }); + /// input + /// }); + /// }); + /// ``` + pub fn region_named(&self, name: &str, func: F) -> R + where + F: FnOnce(&mut Scope) -> R, + { + self.scoped::(name, func) + } } -impl Clone for Child { +impl AsWorker for Scope { + fn config(&self) -> &Config { self.worker.config() } + fn index(&self) -> usize { self.worker.index() } + fn peers(&self) -> usize { self.worker.peers() } + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { self.worker.allocate(identifier, address) } + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { self.worker.pipeline(identifier, address) } + fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { self.worker.broadcast(identifier, address) } + fn new_identifier(&mut self) -> usize { self.worker.new_identifier() } + fn peek_identifier(&self) -> usize { self.worker.peek_identifier() } + fn log_register(&self) -> Option<::std::cell::RefMut<'_, crate::logging_core::Registry>> { self.worker.log_register() } +} + +impl Scheduler for Scope { + fn activations(&self) -> Rc> { self.worker.activations() } +} + +impl Clone for Scope { fn clone(&self) -> Self { - Child { + Scope { subgraph: Rc::clone(&self.subgraph), worker: self.worker.clone(), logging: self.logging.clone(), @@ -139,3 +281,12 @@ impl Clone for Child { } } } + +impl std::fmt::Debug for Scope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Scope") + .field("name", &self.subgraph.borrow().name) + .field("path", &self.subgraph.borrow().path) + .finish_non_exhaustive() + } +} diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 2e1ece794..a8ccfb90c 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -1,187 +1,5 @@ //! Hierarchical organization of timely dataflow graphs. -use std::rc::Rc; -use crate::progress::{Timestamp, Operate, Source, Target}; -use crate::order::Product; -use crate::progress::timestamp::Refines; -use crate::worker::AsWorker; - pub mod child; -pub use self::child::Child; - -/// The fundamental operations required to add and connect operators in a timely dataflow graph. -/// -/// Importantly, this is often a *shared* object, backed by a `Rc>` wrapper. Each method -/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the -/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics. -pub trait Scope: AsWorker+Clone { - - /// The timestamp associated with data in this scope. - type Timestamp : Timestamp; - - /// A useful name describing the scope. - fn name(&self) -> String; - - /// A sequence of scope identifiers describing the path from the worker root to this scope. - fn addr(&self) -> Rc<[usize]>; - - /// A sequence of scope identifiers describing the path from the worker root to the child - /// indicated by `index`. - fn addr_for_child(&self, index: usize) -> Rc<[usize]>; - - /// Connects a source of data with a target of the data. This only links the two for - /// the purposes of tracking progress, rather than effect any data movement itself. - fn add_edge(&self, source: Source, target: Target); - - /// Adds a child `Operate` to the builder's scope. Returns the new child's index. - fn add_operator(&mut self, operator: Box>) -> usize { - let index = self.allocate_operator_index(); - let global = self.new_identifier(); - self.add_operator_with_indices(operator, index, global); - index - } - - /// Allocates a new scope-local operator index. - /// - /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local - /// operator index allocated with this method. This method does cause the scope to expect that - /// an operator will be added, and it is an error not to eventually add such an operator. - fn allocate_operator_index(&mut self) -> usize; - - /// Adds a child `Operate` to the builder's scope using a supplied index. - /// - /// This is used internally when there is a gap between allocate a child identifier and adding the - /// child, as happens in subgraph creation. - fn add_operator_with_index(&mut self, operator: Box>, index: usize) { - let global = self.new_identifier(); - self.add_operator_with_indices(operator, index, global); - } - - /// Adds a child `Operate` to the builder's scope using supplied indices. - /// - /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging. - fn add_operator_with_indices(&mut self, operator: Box>, local: usize, global: usize); - - /// Creates a dataflow subgraph. - /// - /// This method allows the user to create a nested scope with any timestamp that - /// "refines" the enclosing timestamp (informally: extends it in a reversible way). - /// - /// This is most commonly used to create new iterative contexts, and the provided - /// method `iterative` for this task demonstrates the use of this method. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; - /// use timely::order::Product; - /// - /// timely::execute_from_args(std::env::args(), |worker| { - /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::>(); - /// let output = child1.scoped::,_,_>("ScopeName", |child2| { - /// stream.enter(child2).leave(child1) - /// }); - /// input - /// }); - /// }); - /// ``` - fn scoped(&self, name: &str, func: F) -> R - where - T: Timestamp+Refines<::Timestamp>, - F: FnOnce(&mut Child) -> R; - - /// Creates a iterative dataflow subgraph. - /// - /// This method is a specialization of `scoped` which uses the `Product` timestamp - /// combinator, suitable for iterative computations in which iterative development - /// at some time cannot influence prior iterations at a future time. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; - /// - /// timely::execute_from_args(std::env::args(), |worker| { - /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::>(); - /// let output = child1.iterative::(|child2| { - /// stream.enter(child2).leave(child1) - /// }); - /// input - /// }); - /// }); - /// ``` - fn iterative(&self, func: F) -> R - where - T: Timestamp, - F: FnOnce(&mut Child::Timestamp, T>>) -> R, - { - self.scoped::::Timestamp, T>,R,F>("Iterative", func) - } - - /// Creates a dataflow region with the same timestamp. - /// - /// This method is a specialization of `scoped` which uses the same timestamp as the - /// containing scope. It is used mainly to group regions of a dataflow computation, and - /// provides some computational benefits by abstracting the specifics of the region. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; - /// - /// timely::execute_from_args(std::env::args(), |worker| { - /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::>(); - /// let output = child1.region(|child2| { - /// stream.enter(child2).leave(child1) - /// }); - /// input - /// }); - /// }); - /// ``` - fn region(&self, func: F) -> R - where - F: FnOnce(&mut Child<::Timestamp>) -> R, - { - self.region_named("Region", func) - } - - /// Creates a dataflow region with the same timestamp. - /// - /// This method is a specialization of `scoped` which uses the same timestamp as the - /// containing scope. It is used mainly to group regions of a dataflow computation, and - /// provides some computational benefits by abstracting the specifics of the region. - /// - /// This variant allows you to specify a name for the region, which can be read out in - /// the timely logging streams. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; - /// - /// timely::execute_from_args(std::env::args(), |worker| { - /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::>(); - /// let output = child1.region_named("region", |child2| { - /// stream.enter(child2).leave(child1) - /// }); - /// input - /// }); - /// }); - /// ``` - fn region_named(&self, name: &str, func: F) -> R - where - F: FnOnce(&mut Child<::Timestamp>) -> R, - { - self.scoped::<::Timestamp,R,F>(name, func) - } - -} +pub use self::child::{Scope, Iterative}; diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 25ab446d1..f5e44a325 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -4,30 +4,31 @@ //! operator output. Extension methods on the `Stream` type provide the appearance of higher-level //! declarative programming, while constructing a dataflow graph underneath. -use crate::progress::{Source, Target}; +use crate::progress::{Source, Target, Timestamp}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; use crate::dataflow::channels::Message; +use crate::worker::AsWorker; use std::fmt::{self, Debug}; // use dataflow::scopes::root::loggers::CHANNELS_Q; -/// Abstraction of a stream of `C: Container` records timestamped with `S::Timestamp`. +/// Abstraction of a stream of `C: Container` records timestamped with `T`. /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. -pub struct Stream { +pub struct Stream { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. - scope: S, + scope: Scope, /// Maintains a list of Push> interested in the stream's output. - ports: TeeHelper, + ports: TeeHelper, } -impl Clone for Stream { +impl Clone for Stream { fn clone(&self) -> Self { Self { name: self.name, @@ -44,16 +45,16 @@ impl Clone for Stream { } /// A stream batching data in owning vectors. -pub type StreamVec = Stream>; +pub type StreamVec = Stream>; -impl Stream { +impl Stream { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static { + pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static { - let mut logging = self.scope().logging(); + let mut logging: Option = AsWorker::logging(&self.scope()); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { id: identifier, scope_addr: self.scope.addr().to_vec(), @@ -66,13 +67,13 @@ impl Stream { self.ports.add_pusher(pusher); } /// Allocates a `Stream` from a supplied `Source` name and rendezvous point. - pub fn new(source: Source, output: TeeHelper, scope: S) -> Self { + pub fn new(source: Source, output: TeeHelper, scope: Scope) -> Self { Self { name: source, ports: output, scope } } /// The name of the stream's source operator. pub fn name(&self) -> &Source { &self.name } /// The scope immediately containing the stream. - pub fn scope(&self) -> S { self.scope.clone() } + pub fn scope(&self) -> Scope { self.scope.clone() } /// Allows the assertion of a container type, for the benefit of type inference. /// @@ -89,23 +90,20 @@ impl Stream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - pub fn container(self) -> Stream where Self: AsStream { self.as_stream() } + pub fn container(self) -> Stream where Self: AsStream { self.as_stream() } } /// A type that can be translated to a [Stream]. -pub trait AsStream { +pub trait AsStream { /// Translate `self` to a [Stream]. - fn as_stream(self) -> Stream; + fn as_stream(self) -> Stream; } -impl AsStream for Stream { +impl AsStream for Stream { fn as_stream(self) -> Self { self } } -impl Debug for Stream -where - S: Scope, -{ +impl Debug for Stream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Stream") .field("source", &self.name) diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 594907607..b5472e67f 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -1,7 +1,7 @@ //! Starts a timely dataflow execution from configuration information and per-worker logic. use crate::communication::{initialize_from, Allocator, AllocatorBuilder, WorkerGuards}; -use crate::dataflow::scopes::Child; +use crate::dataflow::scopes::Scope; use crate::worker::Worker; use crate::{CommunicationConfig, WorkerConfig}; @@ -124,7 +124,7 @@ impl Config { pub fn example(func: F) -> T where T: Send+'static, - F: FnOnce(&mut Child)->T+Send+Sync+'static + F: FnOnce(&mut Scope)->T+Send+Sync+'static { crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 5f41cb249..2eeac9e45 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -15,7 +15,7 @@ use crate::scheduling::{Schedule, Scheduler, Activations}; use crate::progress::timestamp::{Refines}; use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; -use crate::dataflow::scopes::Child; +use crate::dataflow::scopes::Scope; use crate::logging::TimelyLogger; /// Different ways in which timely's progress tracking can work. @@ -589,7 +589,7 @@ impl Worker { pub fn dataflow(&mut self, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Child)->R, + F: FnOnce(&mut Scope)->R, { self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child)) } @@ -612,7 +612,7 @@ impl Worker { pub fn dataflow_named(&mut self, name: &str, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Child)->R, + F: FnOnce(&mut Scope)->R, { self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child)) } @@ -645,7 +645,7 @@ impl Worker { pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut V, &mut Child)->R, + F: FnOnce(&mut V, &mut Scope)->R, V: Any+'static, { let dataflow_index = self.allocate_dataflow_index(); @@ -659,7 +659,7 @@ impl Worker { let subscope = Rc::new(RefCell::new(subscope)); let result = { - let mut builder = Child { + let mut builder = Scope { subgraph: Rc::clone(&subscope), worker: self.clone(), logging: logging.clone(), @@ -671,8 +671,8 @@ impl Worker { let operator = Rc::try_unwrap(subscope) .map_err(|_| ()) .expect( - "Cannot consume dataflow scope: an outstanding `Child` clone is still alive. \ - This usually means a `Child` was cloned and held past the dataflow() call \ + "Cannot consume dataflow scope: an outstanding `Scope` clone is still alive. \ + This usually means a `Scope` was cloned and held past the dataflow() call \ that constructed it." ) .into_inner() diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs index c5ff370e5..3c0ca5ec9 100644 --- a/timely/tests/shape_scaling.rs +++ b/timely/tests/shape_scaling.rs @@ -64,7 +64,6 @@ fn subgraph_scaling(scale: u64) { .input_from(&mut input) .partition(scale, |()| (0, ())); - use timely::dataflow::Scope; let _outputs = scope.region(|inner| { use timely::dataflow::operators::{Enter, Leave}; parts.into_iter().map(|part| part.enter(inner).leave(scope)).collect::>() From b089c7f6a4cb56a8ead8d099e3c3fbfe0db8a1bd Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 9 Apr 2026 01:21:26 -0400 Subject: [PATCH 2/4] Relocate Scope --- timely/src/dataflow/mod.rs | 4 ++-- timely/src/dataflow/operators/core/enterleave.rs | 2 +- timely/src/dataflow/operators/core/feedback.rs | 2 +- timely/src/dataflow/{scopes/child.rs => scope.rs} | 0 timely/src/dataflow/scopes/mod.rs | 5 ----- timely/src/dataflow/stream.rs | 2 -- timely/src/execute.rs | 2 +- timely/src/worker.rs | 2 +- 8 files changed, 6 insertions(+), 13 deletions(-) rename timely/src/dataflow/{scopes/child.rs => scope.rs} (100%) delete mode 100644 timely/src/dataflow/scopes/mod.rs diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index 31fe5977f..3b4276bd2 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -15,7 +15,7 @@ //! ``` pub use self::stream::{Stream, StreamVec}; -pub use self::scopes::Scope; +pub use self::scope::Scope; pub use self::operators::core::input::Handle as InputHandle; pub use self::operators::vec::input::Handle as InputHandleVec; @@ -23,5 +23,5 @@ pub use self::operators::probe::Handle as ProbeHandle; pub mod operators; pub mod channels; -pub mod scopes; +pub mod scope; pub mod stream; diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 49c1d70d2..d8bd7cfea 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -6,7 +6,7 @@ //! //! # Examples //! ``` -//! use timely::dataflow::scopes::Scope; +//! use timely::dataflow::scope::Scope; //! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect}; //! //! timely::example(|outer| { diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 777f2f06a..6713a1418 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -3,7 +3,7 @@ use crate::Container; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::scopes::child::Iterative; +use crate::dataflow::scope::Iterative; use crate::dataflow::{Stream, Scope}; use crate::order::Product; use crate::progress::frontier::Antichain; diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scope.rs similarity index 100% rename from timely/src/dataflow/scopes/child.rs rename to timely/src/dataflow/scope.rs diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs deleted file mode 100644 index a8ccfb90c..000000000 --- a/timely/src/dataflow/scopes/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Hierarchical organization of timely dataflow graphs. - -pub mod child; - -pub use self::child::{Scope, Iterative}; diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index f5e44a325..8974a563f 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -13,8 +13,6 @@ use crate::dataflow::channels::Message; use crate::worker::AsWorker; use std::fmt::{self, Debug}; -// use dataflow::scopes::root::loggers::CHANNELS_Q; - /// Abstraction of a stream of `C: Container` records timestamped with `T`. /// /// Internally `Stream` maintains a list of data recipients who should be presented with data diff --git a/timely/src/execute.rs b/timely/src/execute.rs index b5472e67f..62814357a 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -1,7 +1,7 @@ //! Starts a timely dataflow execution from configuration information and per-worker logic. use crate::communication::{initialize_from, Allocator, AllocatorBuilder, WorkerGuards}; -use crate::dataflow::scopes::Scope; +use crate::dataflow::scope::Scope; use crate::worker::Worker; use crate::{CommunicationConfig, WorkerConfig}; diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 2eeac9e45..19e00f602 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -15,7 +15,7 @@ use crate::scheduling::{Schedule, Scheduler, Activations}; use crate::progress::timestamp::{Refines}; use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; -use crate::dataflow::scopes::Scope; +use crate::dataflow::scope::Scope; use crate::logging::TimelyLogger; /// Different ways in which timely's progress tracking can work. From dacc40acf9b8d3c29a37d2f2498fa84c4212e215 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 9 Apr 2026 01:35:02 -0400 Subject: [PATCH 3/4] Derandomize generic argument name --- timely/src/dataflow/operators/core/map.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 24f6de08f..558da652d 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -110,26 +110,26 @@ impl Map for Stream { /// A stream wrapper that allows the accumulation of flatmap logic. -pub struct FlatMapBuilder +pub struct FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { - stream: St, + stream: S, logic: F, marker: std::marker::PhantomData, } -impl FlatMapBuilder +impl FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { /// Create a new wrapper with no action on the stream. - pub fn new(stream: St, logic: F) -> Self { + pub fn new(stream: S, logic: F) -> Self { FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } } /// Transform a flatmapped stream through additional logic. - pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { let logic = self.logic; FlatMapBuilder { stream: self.stream, @@ -142,7 +142,7 @@ where where I: IntoIterator, T: Timestamp, - St: Map, + S: Map, C2: Container + SizableContainer + PushInto, { Map::flat_map(self.stream, self.logic) From 0b15605c07983a3023f1f7833b7c92534426ead4 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 9 Apr 2026 01:41:40 -0400 Subject: [PATCH 4/4] Update mdbook --- mdbook/src/chapter_4/chapter_4_4.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mdbook/src/chapter_4/chapter_4_4.md b/mdbook/src/chapter_4/chapter_4_4.md index 2a06e0a6b..f27207cca 100644 --- a/mdbook/src/chapter_4/chapter_4_4.md +++ b/mdbook/src/chapter_4/chapter_4_4.md @@ -56,7 +56,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that At *its* core, `replay_into` takes some sequence of `Event` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well: ```rust,ignore - fn replay_into>(self, scope: &mut S) -> Stream{ + fn replay_into(self, scope: &mut Scope) -> Stream{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); let (targets, stream) = builder.new_output();