Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that
At *its* core, `replay_into` takes some sequence of `Event<T, D>` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well:

```rust,ignore
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, C>{
fn replay_into<T: Timestamp>(self, scope: &mut Scope<T>) -> Stream<T, C>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
let (targets, stream) = builder.new_output();
Expand Down
1 change: 0 additions & 1 deletion timely/examples/event_driven.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Enter, Leave};
use timely::dataflow::operators::vec::Map;

Expand Down
5 changes: 3 additions & 2 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Ordering;

use timely::dataflow::*;
use timely::progress::Timestamp;
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
Expand Down Expand Up @@ -53,8 +54,8 @@ trait UnionFind {
fn union_find(self) -> Self;
}

impl<G: Scope> UnionFind for StreamVec<G, (usize, usize)> {
fn union_find(self) -> StreamVec<G, (usize, usize)> {
impl<T: Timestamp> UnionFind for StreamVec<T, (usize, usize)> {
fn union_find(self) -> StreamVec<T, (usize, usize)> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
//! ```

pub use self::stream::{Stream, StreamVec};
pub use self::scopes::Scope;
pub use self::scope::Scope;

pub use self::operators::core::input::Handle as InputHandle;
pub use self::operators::vec::input::Handle as InputHandleVec;
pub use self::operators::probe::Handle as ProbeHandle;

pub mod operators;
pub mod channels;
pub mod scopes;
pub mod scope;
pub mod stream;
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! and there are several default implementations, including a linked-list, Rust's MPSC
//! queue, and a binary serializer wrapping any `W: Write`.

use crate::dataflow::{Scope, Stream};
use crate::dataflow::Stream;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
Expand Down Expand Up @@ -115,8 +115,8 @@
}
}

impl<S: Scope, C: Container> Capture<S::Timestamp, C> for Stream<S, C> {
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(self, mut event_pusher: P) {
impl<T: Timestamp, C: Container> Capture<T, C> for Stream<T, C> {
fn capture_into<P: EventPusher<T, C>+'static>(self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand All @@ -132,7 +132,7 @@
}
if !progress.frontiers[0].is_empty() {
// transmit any frontier progress.
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());

Check warning on line 135 in timely/src/dataflow/operators/core/capture/capture.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take`
event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
}

Expand Down
13 changes: 7 additions & 6 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//! than that in which the stream was captured.

use crate::dataflow::{Scope, Stream};
use crate::scheduling::Scheduler;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
Expand All @@ -50,24 +51,24 @@ use crate::dataflow::channels::Message;

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
/// Replays `self` into the provided scope, as a `Stream<S, C>`.
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, C> {
/// Replays `self` into the provided scope, as a `Stream<T, C>`.
fn replay_into(self, scope: &mut Scope<T>) -> Stream<T, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
/// Replays `self` into the provided scope, as a `Stream<S, C>`.
/// Replays `self` into the provided scope, as a `Stream<T, C>`.
///
/// The `period` argument allows the specification of a re-activation period, where the operator
/// will re-activate itself every so often. The `None` argument instructs the operator not to
/// re-activate itself.
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> Stream<S, C>;
fn replay_core(self, scope: &mut Scope<T>, period: Option<std::time::Duration>) -> Stream<T, C>;
}

impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
where
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
{
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> Stream<S, C>{
fn replay_core(self, scope: &mut Scope<T>, period: Option<std::time::Duration>) -> Stream<T, C>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

Expand All @@ -88,7 +89,7 @@ where
// The first thing we do is modify our capabilities to match the number of streams we manage.
// This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
// our very first action.
progress.internals[0].update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1);
progress.internals[0].update(T::minimum(), (event_streams.len() as i64) - 1);
started = true;
}

Expand Down
21 changes: 11 additions & 10 deletions timely/src/dataflow/operators/core/concat.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Merges the contents of multiple streams.

use crate::Container;
use crate::progress::Timestamp;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};

/// Merge the contents of two streams.
pub trait Concat<G: Scope, C> {
pub trait Concat<T: Timestamp, C> {
/// Merge the contents of two streams.
///
/// # Examples
Expand All @@ -21,17 +22,17 @@ pub trait Concat<G: Scope, C> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concat(self, other: Stream<G, C>) -> Stream<G, C>;
fn concat(self, other: Stream<T, C>) -> Stream<T, C>;
}

impl<G: Scope, C: Container> Concat<G, C> for Stream<G, C> {
fn concat(self, other: Stream<G, C>) -> Stream<G, C> {
impl<T: Timestamp, C: Container> Concat<T, C> for Stream<T, C> {
fn concat(self, other: Stream<T, C>) -> Stream<T, C> {
self.scope().concatenate([self, other])
}
}

/// Merge the contents of multiple streams.
pub trait Concatenate<G: Scope, C> {
pub trait Concatenate<T: Timestamp, C> {
/// Merge the contents of multiple streams.
///
/// # Examples
Expand All @@ -49,15 +50,15 @@ pub trait Concatenate<G: Scope, C> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concatenate<I>(&self, sources: I) -> Stream<G, C>
fn concatenate<I>(&self, sources: I) -> Stream<T, C>
where
I: IntoIterator<Item=Stream<G, C>>;
I: IntoIterator<Item=Stream<T, C>>;
}

impl<G: Scope, C: Container> Concatenate<G, C> for G {
fn concatenate<I>(&self, sources: I) -> Stream<G, C>
impl<T: Timestamp, C: Container> Concatenate<T, C> for Scope<T> {
fn concatenate<I>(&self, sources: I) -> Stream<T, C>
where
I: IntoIterator<Item=Stream<G, C>>
I: IntoIterator<Item=Stream<T, C>>
{

// create an operator builder.
Expand Down
36 changes: 21 additions & 15 deletions timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! # Examples
//! ```
//! use timely::dataflow::scopes::Scope;
//! use timely::dataflow::scope::Scope;
//! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect};
//!
//! timely::example(|outer| {
Expand All @@ -32,18 +32,16 @@ use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::Message;
use crate::worker::AsWorker;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::scopes::Child;

/// Extension trait to move a `Stream` into a child of its current `Scope`.
pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C> {
pub trait Enter<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
/// Moves the `Stream` argument into a child of its current `Scope`.
///
/// The destination scope must be a child of the stream's scope.
/// The method checks this property at runtime, and will panic if not respected.
///
/// # Examples
/// ```
/// use timely::dataflow::scopes::Scope;
/// use timely::dataflow::operators::{Enter, Leave, ToStream};
///
/// timely::example(|outer| {
Expand All @@ -53,11 +51,16 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C> {
/// });
/// });
/// ```
fn enter(self, inner: &Child<T>) -> Stream<Child<T>, C>;
fn enter(self, inner: &Scope<TInner>) -> Stream<TInner, C>;
}

impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> Enter<G, T, C> for Stream<G, C> {
fn enter(self, inner: &Child<T>) -> Stream<Child<T>, C> {
impl<TOuter, TInner, C> Enter<TOuter, TInner, C> for Stream<TOuter, C>
where
TOuter: Timestamp,
TInner: Timestamp + Refines<TOuter>,
C: Container,
{
fn enter(self, inner: &Scope<TInner>) -> Stream<TInner, C> {

use crate::scheduling::Scheduler;

Expand All @@ -73,7 +76,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> Enter<G, T, C>
outer_addr,
);

let (targets, registrar) = Tee::<T, C>::new();
let (targets, registrar) = Tee::<TInner, C>::new();
let ingress = IngressNub {
targets: Counter::new(targets),
phantom: PhantomData,
Expand All @@ -100,7 +103,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> Enter<G, T, C>
}

/// Extension trait to move a `Stream` to the parent of its current `Scope`.
pub trait Leave<G: Scope, C> {
pub trait Leave<TOuter: Timestamp, C> {
/// Moves a `Stream` to the parent of its current `Scope`.
///
/// The parent scope must be supplied as an argument.
Expand All @@ -110,7 +113,6 @@ pub trait Leave<G: Scope, C> {
///
/// # Examples
/// ```
/// use timely::dataflow::scopes::Scope;
/// use timely::dataflow::operators::{Enter, Leave, ToStream};
///
/// timely::example(|outer| {
Expand All @@ -120,11 +122,16 @@ pub trait Leave<G: Scope, C> {
/// });
/// });
/// ```
fn leave(self, outer: &G) -> Stream<G, C>;
fn leave(self, outer: &Scope<TOuter>) -> Stream<TOuter, C>;
}

impl<G: Scope, C: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for Stream<Child<T>, C> {
fn leave(self, outer: &G) -> Stream<G, C> {
impl<TOuter, TInner, C> Leave<TOuter, C> for Stream<TInner, C>
where
TOuter: Timestamp,
TInner: Timestamp + Refines<TOuter>,
C: Container,
{
fn leave(self, outer: &Scope<TOuter>) -> Stream<TOuter, C> {

let scope = self.scope();

Expand All @@ -142,7 +149,7 @@ impl<G: Scope, C: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for

let output = scope.subgraph.borrow_mut().new_output();
let target = Target::new(0, output.port);
let (targets, registrar) = Tee::<G::Timestamp, C>::new();
let (targets, registrar) = Tee::<TOuter, C>::new();
let egress = EgressNub { targets, phantom: PhantomData };
let channel_id = scope.clone().new_identifier();

Expand Down Expand Up @@ -277,7 +284,6 @@ mod test {
use crate::dataflow::{InputHandle, ProbeHandle};
use crate::dataflow::operators::{vec::Input, Inspect, Probe};

use crate::dataflow::Scope;
use crate::dataflow::operators::{Enter, Leave};

// initializes and runs a timely dataflow.
Expand Down
7 changes: 4 additions & 3 deletions timely/src/dataflow/operators/core/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Exchange records between workers.

use crate::Container;
use crate::progress::Timestamp;
use crate::container::{DrainContainer, SizableContainer, PushInto};
use crate::dataflow::channels::pact::ExchangeCore;
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::Stream;

/// Exchange records between workers.
pub trait Exchange<C: DrainContainer> {
Expand All @@ -29,7 +30,7 @@ pub trait Exchange<C: DrainContainer> {
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
}

impl<G: Scope, C> Exchange<C> for Stream<G, C>
impl<T: Timestamp, C> Exchange<C> for Stream<T, C>
where
C: Container
+ SizableContainer
Expand All @@ -38,7 +39,7 @@ where
+ crate::dataflow::channels::ContainerBytes
+ for<'a> PushInto<C::Item<'a>>,
{
fn exchange<F>(self, route: F) -> Stream<G, C>
fn exchange<F>(self, route: F) -> Stream<T, C>
where
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
{
Expand Down
Loading
Loading