From 51255a879597347a025f36b923933886984c9408 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 17 Aug 2019 17:44:44 -0400 Subject: [PATCH] tidying up progress --- timely/src/dataflow/operators/enterleave.rs | 4 +- .../dataflow/operators/generic/builder_raw.rs | 5 +- timely/src/dataflow/operators/input.rs | 3 +- .../src/dataflow/operators/unordered_input.rs | 2 +- timely/src/dataflow/stream.rs | 4 +- timely/src/progress/mod.rs | 28 ++++++--- timely/src/progress/reachability.rs | 59 +++++++++---------- timely/src/progress/subgraph.rs | 10 ++-- timely/src/scheduling/activate.rs | 8 ++- timely/src/worker.rs | 14 ++--- 10 files changed, 76 insertions(+), 61 deletions(-) diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index f0608d1dd..bb5f6d12b 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -95,7 +95,7 @@ impl, D: Data> Enter for S let channel_id = scope.clone().new_identifier(); self.connect_to(input, ingress, channel_id); - Stream::new(Source { index: 0, port: input.port }, registrar, scope.clone()) + Stream::new(Source::new(0, input.port), registrar, scope.clone()) } } @@ -126,7 +126,7 @@ impl<'a, G: Scope, D: Data, T: Timestamp+Refines> Leave for let output = scope.subgraph.borrow_mut().new_output(); let (targets, registrar) = Tee::::new(); let channel_id = scope.clone().new_identifier(); - self.connect_to(Target { index: 0, port: output.port }, EgressNub { targets, phantom: PhantomData }, channel_id); + self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); Stream::new( output, diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index f6bf6495a..e1f9bb60e 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -119,7 +119,7 @@ impl OperatorBuilder { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging); - let target = Target { index: self.index, port: self.shape.inputs }; + let target = Target::new(self.index, self.shape.inputs); stream.connect_to(target, sender, channel_id); self.shape.inputs += 1; @@ -140,7 +140,7 @@ impl OperatorBuilder { pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, Stream) { let (targets, registrar) = Tee::::new(); - let source = Source { index: self.index, port: self.shape.outputs }; + let source = Source::new(self.index, self.shape.outputs); let stream = Stream::new(source, registrar, self.scope.clone()); self.shape.outputs += 1; @@ -236,4 +236,3 @@ where fn notify_me(&self) -> bool { self.shape.notify } } - diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index e73f15e37..fdc4e6d17 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -127,7 +127,7 @@ impl Input for G where ::Timestamp: TotalOrder { copies, }), index); - Stream::new(Source { index, port: 0 }, registrar, self.clone()) + Stream::new(Source::new(index, 0), registrar, self.clone()) } } @@ -169,6 +169,7 @@ impl Operate for Operator { /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. + pub struct Handle { activate: Vec, progress: Vec>>>, diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 10f4c8034..e62fad0a6 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -107,7 +107,7 @@ impl UnorderedInput for G { peers, }), index); - ((helper, cap), Stream::new(Source { index, port: 0 }, registrar, self.clone())) + ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone())) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index e507628c7..619229bfa 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -38,8 +38,8 @@ impl Stream { logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { id: identifier, scope_addr: self.scope.addr(), - source: (self.name.index, self.name.port), - target: (target.index, target.port), + source: (self.name.node, self.name.port), + target: (target.node, target.port), })); self.scope.add_edge(self.name, target); diff --git a/timely/src/progress/mod.rs b/timely/src/progress/mod.rs index 407c2dc5b..912ce456d 100644 --- a/timely/src/progress/mod.rs +++ b/timely/src/progress/mod.rs @@ -18,9 +18,9 @@ pub mod subgraph; #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Abomonation, Serialize, Deserialize)] pub struct Location { /// A scope-local operator identifier. - node: usize, + pub node: usize, /// An operator port identifier.` - port: Port, + pub port: Port, } impl Location { @@ -41,7 +41,7 @@ impl Location { impl From for Location { fn from(target: Target) -> Self { Location { - node: target.index, + node: target.node, port: Port::Target(target.port), } } @@ -50,7 +50,7 @@ impl From for Location { impl From for Location { fn from(source: Source) -> Self { Location { - node: source.index, + node: source.node, port: Port::Source(source.port), } } @@ -72,11 +72,18 @@ pub enum Port { #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)] pub struct Source { /// Index of the source operator. - pub index: usize, + pub node: usize, /// Number of the output port from the operator. pub port: usize, } +impl Source { + /// Creates a new source from node and port identifiers. + pub fn new(node: usize, port: usize) -> Self { + Self { node, port } + } +} + /// Names a target of a data stream. /// /// A target of data is either a child input, or an output to a parent. @@ -84,7 +91,14 @@ pub struct Source { #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)] pub struct Target { /// Index of the target operator. - pub index: usize, + pub node: usize, /// Number of the input port to the operator. pub port: usize, -} \ No newline at end of file +} + +impl Target { + /// Creates a new target from node and port identifiers. + pub fn new(node: usize, port: usize) -> Self { + Self { node, port } + } +} diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index c7b2bf16b..0de79af30 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -22,15 +22,15 @@ //! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); //! //! // Connect nodes in sequence, looping around to the first from the last. -//! builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); -//! builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); -//! builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); +//! builder.add_edge(Source::new(0, 0), Target::new(1, 0)); +//! builder.add_edge(Source::new(1, 0), Target::new(2, 0)); +//! builder.add_edge(Source::new(2, 0), Target::new(0, 0)); //! //! // Construct a reachability tracker. //! let (mut tracker, _) = builder.build(); //! //! // Introduce a pointstamp at the output of the first node. -//! tracker.update_source(Source { index: 0, port: 0}, 17, 1); +//! tracker.update_source(Source::new(0, 0), 17, 1); //! //! // Propagate changes; until this call updates are simply buffered. //! tracker.propagate_all(); @@ -52,7 +52,7 @@ //! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1)); //! //! // Introduce a pointstamp at the output of the first node. -//! tracker.update_source(Source { index: 0, port: 0}, 17, -1); +//! tracker.update_source(Source::new(0, 0), 17, -1); //! //! // Propagate changes; until this call updates are simply buffered. //! tracker.propagate_all(); @@ -118,14 +118,13 @@ use crate::progress::timestamp::PathSummary; /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); /// /// // Connect nodes in sequence, looping around to the first from the last. -/// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); -/// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); -/// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); +/// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); +/// builder.add_edge(Source::new(1, 0), Target::new(2, 0)); +/// builder.add_edge(Source::new(2, 0), Target::new(0, 0)); /// /// // Summarize reachability information. /// let (tracker, _) = builder.build(); /// ``` - #[derive(Clone, Debug)] pub struct Builder { /// Internal connections within hosted operators. @@ -183,10 +182,10 @@ impl Builder { pub fn add_edge(&mut self, source: Source, target: Target) { // Assert that the edge is between existing ports. - debug_assert!(source.port < self.shape[source.index].1); - debug_assert!(target.port < self.shape[target.index].0); + debug_assert!(source.port < self.shape[source.node].1); + debug_assert!(target.port < self.shape[target.node].0); - self.edges[source.index][source.port].push(target); + self.edges[source.node][source.port].push(target); } /// Compiles the current nodes and edges into immutable path summaries. @@ -228,12 +227,12 @@ impl Builder { /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// /// // Connect nodes in sequence, looping around to the first from the last. - /// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); - /// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); + /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); + /// builder.add_edge(Source::new(1, 0), Target::new(2, 0)); /// /// assert!(builder.is_acyclic()); /// - /// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); + /// builder.add_edge(Source::new(2, 0), Target::new(0, 0)); /// /// assert!(!builder.is_acyclic()); /// ``` @@ -257,8 +256,8 @@ impl Builder { /// ]); /// /// // Connect each output to the opposite input. - /// builder.add_edge(Source { index: 0, port: 0}, Target { index: 0, port: 1} ); - /// builder.add_edge(Source { index: 0, port: 1}, Target { index: 0, port: 0} ); + /// builder.add_edge(Source::new(0, 0), Target::new(0, 1)); + /// builder.add_edge(Source::new(0, 1), Target::new(0, 0)); /// /// assert!(builder.is_acyclic()); /// ``` @@ -328,7 +327,6 @@ impl Builder { /// A `Tracker` tracks, for a fixed graph topology, the implications of /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. - pub struct Tracker { /// Internal connections within hosted operators. @@ -354,7 +352,6 @@ pub struct Tracker { /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers, /// rather than their multiplicities. We separately track the frontiers resulting from propagated /// frontiers, to protect them from transient negativity in inbound target updates. - per_operator: Vec>, /// Source and target changes are buffered, which allows us to delay processing until propagation, @@ -380,16 +377,17 @@ pub struct Tracker { total_counts: i64, } -/// +/// Target and source information for each operator. pub struct PerOperator { - /// + /// Port information for each target. pub targets: Vec>, - /// + /// Port information for each source. pub sources: Vec>, } impl PerOperator { - fn new(inputs: usize, outputs: usize) -> Self { + /// A new PerOperator bundle from numbers of input and output ports. + pub fn new(inputs: usize, outputs: usize) -> Self { PerOperator { targets: vec![PortInformation::new(); inputs], sources: vec![PortInformation::new(); outputs], @@ -409,7 +407,8 @@ pub struct PortInformation { } impl PortInformation { - fn new() -> Self { + /// Creates empty port information. + pub fn new() -> Self { PortInformation { pointstamps: MutableAntichain::new(), implications: MutableAntichain::new(), @@ -425,7 +424,7 @@ impl PortInformation { /// are outstanding pointstamp updates that are strictly less than /// this pointstamp. #[inline] - fn is_global(&self, time: &T) -> bool { + pub fn is_global(&self, time: &T) -> bool { let dominated = self.implications.frontier().iter().any(|t| t.less_than(time)); let redundant = self.implications.count_for(time) > 1; !dominated && !redundant @@ -438,8 +437,8 @@ impl Tracker { #[inline] pub fn update(&mut self, location: Location, time: T, value: i64) { match location.port { - Port::Target(port) => self.update_target(Target { index: location.node, port }, time, value), - Port::Source(port) => self.update_source(Source { index: location.node, port }, time, value), + Port::Target(port) => self.update_target(Target::new(location.node, port), time, value), + Port::Source(port) => self.update_source(Source::new(location.node, port), time, value), }; } @@ -536,7 +535,7 @@ impl Tracker { // witness that frontier. for ((target, time), diff) in self.target_changes.drain() { - let operator = &mut self.per_operator[target.index].targets[target.port]; + let operator = &mut self.per_operator[target.node].targets[target.port]; let changes = operator.pointstamps.update_iter(Some((time, diff))); for (time, diff) in changes { @@ -555,7 +554,7 @@ impl Tracker { for ((source, time), diff) in self.source_changes.drain() { - let operator = &mut self.per_operator[source.index].sources[source.port]; + let operator = &mut self.per_operator[source.node].sources[source.port]; let changes = operator.pointstamps.update_iter(Some((time, diff))); for (time, diff) in changes { @@ -700,7 +699,7 @@ fn summarize_outputs( .iter() .flat_map(|x| x.iter()) .flat_map(|x| x.iter()) - .filter(|target| target.index == 0); + .filter(|target| target.node == 0); // The scope may have no outputs, in which case we can do no work. for output_target in outputs { diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 29735826b..a0dc69964 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -71,13 +71,13 @@ where /// Allocates a new input to the subgraph and returns the target to that input in the outer graph. pub fn new_input(&mut self, shared_counts: Rc>>) -> Target { self.input_messages.push(shared_counts); - Target { index: self.index, port: self.input_messages.len() - 1 } + Target::new(self.index, self.input_messages.len() - 1) } /// Allocates a new output from the subgraph and returns the source of that output in the outer graph. pub fn new_output(&mut self) -> Source { self.output_capabilities.push(MutableAntichain::new()); - Source { index: self.index, port: self.output_capabilities.len() - 1 } + Source::new(self.index, self.output_capabilities.len() - 1) } /// Introduces a dependence from the source to the target. @@ -161,7 +161,7 @@ where } for (source, target) in self.edge_stash { - self.children[source.index].edges[source.port].push(target); + self.children[source.node].edges[source.port].push(target); builder.add_edge(source, target); } @@ -364,7 +364,7 @@ where /// Move frontier changes from parent into progress statements. fn accept_frontier(&mut self) { for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() { - let source = Source { index: 0, port }; + let source = Source::new(0, port); for (time, value) in changes.drain() { self.pointstamp_tracker.update_source( source, @@ -699,7 +699,7 @@ impl PerOperatorState { for (time, delta) in produced.drain() { for target in &self.edges[output] { pointstamps.update((Location::from(*target), time.clone()), delta); - temp_active.push(Reverse(target.index)); + temp_active.push(Reverse(target.node)); } } } diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 444eb15ea..9d8b529db 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use std::cell::RefCell; /// Allocation-free activation tracker. +#[derive(Default)] pub struct Activations { clean: usize, /// `(offset, length)` @@ -15,6 +16,7 @@ pub struct Activations { impl Activations { /// Creates a new activation tracker. + #[deprecated(since="0.10",note="Type implements Default")] pub fn new() -> Self { Self { clean: 0, @@ -24,12 +26,12 @@ impl Activations { } } - /// Indicates if there no pending activations. + /// Indicates if there are no pending activations. pub fn is_empty(&self) -> bool { self.bounds.is_empty() } - /// Unparks task addressed by `path`. + /// Unparks the task addressed by `path`. pub fn activate(&mut self, path: &[usize]) { self.bounds.push((self.slices.len(), path.len())); self.slices.extend(path); @@ -57,7 +59,7 @@ impl Activations { self.clean = self.bounds.len(); } - /// + /// Maps a function across activated paths. pub fn map_active(&self, logic: impl Fn(&[usize])) { for (offset, length) in self.bounds.iter() { logic(&self.slices[*offset .. (*offset + *length)]); diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 0185268cd..06347d309 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -107,15 +107,15 @@ impl Worker { let index = c.index(); Worker { timer: now.clone(), - paths: Rc::new(RefCell::new(HashMap::new())), + paths: Default::default(), allocator: Rc::new(RefCell::new(c)), - identifiers: Rc::new(RefCell::new(0)), - dataflows: Rc::new(RefCell::new(HashMap::new())), - dataflow_counter: Rc::new(RefCell::new(0)), + identifiers: Default::default(), + dataflows: Default::default(), + dataflow_counter: Default::default(), logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))), - activations: Rc::new(RefCell::new(Activations::new())), - active_dataflows: Vec::new(), - temp_channel_ids: Rc::new(RefCell::new(Vec::new())), + activations: Default::default(), + active_dataflows: Default::default(), + temp_channel_ids: Default::default(), } }