Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, D: Data> Enter<G, T, D> for S

let channel_id = scope.clone().new_identifier();
self.connect_to(input, ingress, channel_id);
Stream::new(Source { index: 0, port: input.port }, registrar, scope.clone())
Stream::new(Source::new(0, input.port), registrar, scope.clone())
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ impl<'a, G: Scope, D: Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, D> for
let output = scope.subgraph.borrow_mut().new_output();
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
let channel_id = scope.clone().new_identifier();
self.connect_to(Target { index: 0, port: output.port }, EgressNub { targets, phantom: PhantomData }, channel_id);
self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id);

Stream::new(
output,
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<G: Scope> OperatorBuilder<G> {
let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
let target = Target { index: self.index, port: self.shape.inputs };
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);

self.shape.inputs += 1;
Expand All @@ -140,7 +140,7 @@ impl<G: Scope> OperatorBuilder<G> {
pub fn new_output_connection<D: Data>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, D>, Stream<G, D>) {

let (targets, registrar) = Tee::<G::Timestamp,D>::new();
let source = Source { index: self.index, port: self.shape.outputs };
let source = Source::new(self.index, self.shape.outputs);
let stream = Stream::new(source, registrar, self.scope.clone());

self.shape.outputs += 1;
Expand Down Expand Up @@ -236,4 +236,3 @@ where

fn notify_me(&self) -> bool { self.shape.notify }
}

3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
copies,
}), index);

Stream::new(Source { index, port: 0 }, registrar, self.clone())
Stream::new(Source::new(index, 0), registrar, self.clone())
}
}

Expand Down Expand Up @@ -169,6 +169,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {


/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.

pub struct Handle<T: Timestamp, D: Data> {
activate: Vec<Activator>,
progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<G: Scope> UnorderedInput<G> for G {
peers,
}), index);

((helper, cap), Stream::new(Source { index, port: 0 }, registrar, self.clone()))
((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl<S: Scope, D> Stream<S, D> {
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
source: (self.name.index, self.name.port),
target: (target.index, target.port),
source: (self.name.node, self.name.port),
target: (target.node, target.port),
}));

self.scope.add_edge(self.name, target);
Expand Down
28 changes: 21 additions & 7 deletions timely/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub mod subgraph;
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Abomonation, Serialize, Deserialize)]
pub struct Location {
/// A scope-local operator identifier.
node: usize,
pub node: usize,
/// An operator port identifier.`
port: Port,
pub port: Port,
}

impl Location {
Expand All @@ -41,7 +41,7 @@ impl Location {
impl From<Target> for Location {
fn from(target: Target) -> Self {
Location {
node: target.index,
node: target.node,
port: Port::Target(target.port),
}
}
Expand All @@ -50,7 +50,7 @@ impl From<Target> for Location {
impl From<Source> for Location {
fn from(source: Source) -> Self {
Location {
node: source.index,
node: source.node,
port: Port::Source(source.port),
}
}
Expand All @@ -72,19 +72,33 @@ pub enum Port {
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct Source {
/// Index of the source operator.
pub index: usize,
pub node: usize,
/// Number of the output port from the operator.
pub port: usize,
}

impl Source {
/// Creates a new source from node and port identifiers.
pub fn new(node: usize, port: usize) -> Self {
Self { node, port }
}
}

/// Names a target of a data stream.
///
/// A target of data is either a child input, or an output to a parent.
/// Conventionally, `index` zero is used for parent output.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct Target {
/// Index of the target operator.
pub index: usize,
pub node: usize,
/// Number of the input port to the operator.
pub port: usize,
}
}

impl Target {
/// Creates a new target from node and port identifiers.
pub fn new(node: usize, port: usize) -> Self {
Self { node, port }
}
}
59 changes: 29 additions & 30 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
//!
//! // Connect nodes in sequence, looping around to the first from the last.
//! builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} );
//! builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} );
//! builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} );
//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
//!
//! // Construct a reachability tracker.
//! let (mut tracker, _) = builder.build();
//!
//! // Introduce a pointstamp at the output of the first node.
//! tracker.update_source(Source { index: 0, port: 0}, 17, 1);
//! tracker.update_source(Source::new(0, 0), 17, 1);
//!
//! // Propagate changes; until this call updates are simply buffered.
//! tracker.propagate_all();
Expand All @@ -52,7 +52,7 @@
//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
//!
//! // Introduce a pointstamp at the output of the first node.
//! tracker.update_source(Source { index: 0, port: 0}, 17, -1);
//! tracker.update_source(Source::new(0, 0), 17, -1);
//!
//! // Propagate changes; until this call updates are simply buffered.
//! tracker.propagate_all();
Expand Down Expand Up @@ -118,14 +118,13 @@ use crate::progress::timestamp::PathSummary;
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
///
/// // Connect nodes in sequence, looping around to the first from the last.
/// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} );
/// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} );
/// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} );
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
///
/// // Summarize reachability information.
/// let (tracker, _) = builder.build();
/// ```

#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
/// Internal connections within hosted operators.
Expand Down Expand Up @@ -183,10 +182,10 @@ impl<T: Timestamp> Builder<T> {
pub fn add_edge(&mut self, source: Source, target: Target) {

// Assert that the edge is between existing ports.
debug_assert!(source.port < self.shape[source.index].1);
debug_assert!(target.port < self.shape[target.index].0);
debug_assert!(source.port < self.shape[source.node].1);
debug_assert!(target.port < self.shape[target.node].0);

self.edges[source.index][source.port].push(target);
self.edges[source.node][source.port].push(target);
}

/// Compiles the current nodes and edges into immutable path summaries.
Expand Down Expand Up @@ -228,12 +227,12 @@ impl<T: Timestamp> Builder<T> {
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]);
///
/// // Connect nodes in sequence, looping around to the first from the last.
/// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} );
/// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} );
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
///
/// assert!(builder.is_acyclic());
///
/// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} );
/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
///
/// assert!(!builder.is_acyclic());
/// ```
Expand All @@ -257,8 +256,8 @@ impl<T: Timestamp> Builder<T> {
/// ]);
///
/// // Connect each output to the opposite input.
/// builder.add_edge(Source { index: 0, port: 0}, Target { index: 0, port: 1} );
/// builder.add_edge(Source { index: 0, port: 1}, Target { index: 0, port: 0} );
/// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
/// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
///
/// assert!(builder.is_acyclic());
/// ```
Expand Down Expand Up @@ -328,7 +327,6 @@ impl<T: Timestamp> Builder<T> {
/// A `Tracker` tracks, for a fixed graph topology, the implications of
/// pointstamp changes at various node input and output ports. These changes may
/// alter the potential pointstamps that could arrive at downstream input ports.

pub struct Tracker<T:Timestamp> {

/// Internal connections within hosted operators.
Expand All @@ -354,7 +352,6 @@ pub struct Tracker<T:Timestamp> {
/// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
/// rather than their multiplicities. We separately track the frontiers resulting from propagated
/// frontiers, to protect them from transient negativity in inbound target updates.

per_operator: Vec<PerOperator<T>>,

/// Source and target changes are buffered, which allows us to delay processing until propagation,
Expand All @@ -380,16 +377,17 @@ pub struct Tracker<T:Timestamp> {
total_counts: i64,
}

///
/// Target and source information for each operator.
pub struct PerOperator<T: Timestamp> {
///
/// Port information for each target.
pub targets: Vec<PortInformation<T>>,
///
/// Port information for each source.
pub sources: Vec<PortInformation<T>>,
}

impl<T: Timestamp> PerOperator<T> {
fn new(inputs: usize, outputs: usize) -> Self {
/// A new PerOperator bundle from numbers of input and output ports.
pub fn new(inputs: usize, outputs: usize) -> Self {
PerOperator {
targets: vec![PortInformation::new(); inputs],
sources: vec![PortInformation::new(); outputs],
Expand All @@ -409,7 +407,8 @@ pub struct PortInformation<T: Timestamp> {
}

impl<T: Timestamp> PortInformation<T> {
fn new() -> Self {
/// Creates empty port information.
pub fn new() -> Self {
PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
Expand All @@ -425,7 +424,7 @@ impl<T: Timestamp> PortInformation<T> {
/// are outstanding pointstamp updates that are strictly less than
/// this pointstamp.
#[inline]
fn is_global(&self, time: &T) -> bool {
pub fn is_global(&self, time: &T) -> bool {
let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
let redundant = self.implications.count_for(time) > 1;
!dominated && !redundant
Expand All @@ -438,8 +437,8 @@ impl<T:Timestamp> Tracker<T> {
#[inline]
pub fn update(&mut self, location: Location, time: T, value: i64) {
match location.port {
Port::Target(port) => self.update_target(Target { index: location.node, port }, time, value),
Port::Source(port) => self.update_source(Source { index: location.node, port }, time, value),
Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
};
}

Expand Down Expand Up @@ -536,7 +535,7 @@ impl<T:Timestamp> Tracker<T> {
// witness that frontier.
for ((target, time), diff) in self.target_changes.drain() {

let operator = &mut self.per_operator[target.index].targets[target.port];
let operator = &mut self.per_operator[target.node].targets[target.port];
let changes = operator.pointstamps.update_iter(Some((time, diff)));

for (time, diff) in changes {
Expand All @@ -555,7 +554,7 @@ impl<T:Timestamp> Tracker<T> {

for ((source, time), diff) in self.source_changes.drain() {

let operator = &mut self.per_operator[source.index].sources[source.port];
let operator = &mut self.per_operator[source.node].sources[source.port];
let changes = operator.pointstamps.update_iter(Some((time, diff)));

for (time, diff) in changes {
Expand Down Expand Up @@ -700,7 +699,7 @@ fn summarize_outputs<T: Timestamp>(
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
.filter(|target| target.index == 0);
.filter(|target| target.node == 0);

// The scope may have no outputs, in which case we can do no work.
for output_target in outputs {
Expand Down
10 changes: 5 additions & 5 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ where
/// Allocates a new input to the subgraph and returns the target to that input in the outer graph.
pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
self.input_messages.push(shared_counts);
Target { index: self.index, port: self.input_messages.len() - 1 }
Target::new(self.index, self.input_messages.len() - 1)
}

/// Allocates a new output from the subgraph and returns the source of that output in the outer graph.
pub fn new_output(&mut self) -> Source {
self.output_capabilities.push(MutableAntichain::new());
Source { index: self.index, port: self.output_capabilities.len() - 1 }
Source::new(self.index, self.output_capabilities.len() - 1)
}

/// Introduces a dependence from the source to the target.
Expand Down Expand Up @@ -161,7 +161,7 @@ where
}

for (source, target) in self.edge_stash {
self.children[source.index].edges[source.port].push(target);
self.children[source.node].edges[source.port].push(target);
builder.add_edge(source, target);
}

Expand Down Expand Up @@ -364,7 +364,7 @@ where
/// Move frontier changes from parent into progress statements.
fn accept_frontier(&mut self) {
for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
let source = Source { index: 0, port };
let source = Source::new(0, port);
for (time, value) in changes.drain() {
self.pointstamp_tracker.update_source(
source,
Expand Down Expand Up @@ -699,7 +699,7 @@ impl<T: Timestamp> PerOperatorState<T> {
for (time, delta) in produced.drain() {
for target in &self.edges[output] {
pointstamps.update((Location::from(*target), time.clone()), delta);
temp_active.push(Reverse(target.index));
temp_active.push(Reverse(target.node));
}
}
}
Expand Down
Loading