From 5312203e55355d5328d93e50b13d07daeafecec5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 18 Jan 2021 08:54:57 -0500 Subject: [PATCH 1/3] Actually log progress updates --- timely/src/progress/broadcast.rs | 112 ++++++++++++++++++++++------ timely/src/progress/change_batch.rs | 21 ++++++ 2 files changed, 112 insertions(+), 21 deletions(-) diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 81c5e71a9..e0ba23347 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,7 +1,7 @@ //! Broadcasts progress information among workers. use crate::progress::{ChangeBatch, Timestamp}; -use crate::progress::Location; +use crate::progress::{Location, Port}; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; @@ -58,16 +58,55 @@ impl Progcaster { changes.compact(); if !changes.is_empty() { - self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - is_send: true, - source: self.source, - channel: self.channel_identifier, - seq_no: self.counter, - addr: self.addr.clone(), - // TODO: fill with additional data - messages: Vec::new(), - internal: Vec::new(), - })); + // self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { + // is_send: true, + // source: self.source, + // channel: self.channel_identifier, + // seq_no: self.counter, + // addr: self.addr.clone(), + // // TODO: fill with additional data + // messages: Vec::new(), + // internal: Vec::new(), + // })); + + + // This logging is relatively more expensive than other logging, as we + // have formatting and string allocations on the main path. We do have + // local type information about the timestamp, and we could log *that* + // explicitly, but the consumer would have to know what to look for and + // interpret appropriately. That's a big ask, so let's start with this, + // and as folks need more performant logging think about allowing users + // to select the more efficient variant. + self.logging.as_ref().map(|l| { + + // Pre-allocate enough space; we transfer ownership, so there is not + // an apportunity to re-use allocations (w/o changing the logging + // interface to accept references). + let mut messages = Vec::with_capacity(changes.len()); + let mut internal = Vec::with_capacity(changes.len()); + + // TODO: Reconsider `String` type or perhaps re-use allocation. + for ((location, time), diff) in changes.iter() { + match location.port { + Port::Target(port) => { + messages.push((location.node, port, format!("{:?}", time), *diff)) + }, + Port::Source(port) => { + internal.push((location.node, port, format!("{:?}", time), *diff)) + } + } + } + + l.log(crate::logging::ProgressEvent { + is_send: true, + source: self.source, + channel: self.channel_identifier, + seq_no: self.counter, + addr: self.addr.clone(), + messages, + internal, + }); + }); for pusher in self.pushers.iter_mut() { @@ -108,16 +147,47 @@ impl Progcaster { let addr = &mut self.addr; let channel = self.channel_identifier; - self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - is_send: false, - source: source, - seq_no: counter, - channel, - addr: addr.clone(), - // TODO: fill with additional data - messages: Vec::new(), - internal: Vec::new(), - })); + + + // self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { + // is_send: false, + // source: source, + // seq_no: counter, + // channel, + // addr: addr.clone(), + // // TODO: fill with additional data + // messages: Vec::new(), + // internal: Vec::new(), + // })); + + self.logging.as_ref().map(|l| { + + let mut messages = Vec::with_capacity(changes.len()); + let mut internal = Vec::with_capacity(changes.len()); + + // TODO: Reconsider `String` type or perhaps re-use allocation. + for ((location, time), diff) in recv_changes.iter() { + + match location.port { + Port::Target(port) => { + messages.push((location.node, port, format!("{:?}", time), *diff)) + }, + Port::Source(port) => { + internal.push((location.node, port, format!("{:?}", time), *diff)) + } + } + } + + l.log(crate::logging::ProgressEvent { + is_send: false, + source: source, + seq_no: counter, + channel, + addr: addr.clone(), + messages, + internal, + }); + }); // We clone rather than drain to avoid deserialization. for &(ref update, delta) in recv_changes.iter() { diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 348a098c0..09f4cb5ec 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -201,6 +201,27 @@ impl ChangeBatch { } } + /// Number of compacted updates. + /// + /// This method requires mutable access to `self` because it may need to compact the + /// representation to determine the number of actual updates. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let mut batch = ChangeBatch::::new_from(17, 1); + /// batch.update(17, -1); + /// batch.update(14, -1); + /// assert_eq!(batch.len(), 1); + ///``` + #[inline] + pub fn len(&mut self) -> usize { + self.compact(); + self.updates.len() + } + /// Drains `self` into `other`. /// /// This method has similar a effect to calling `other.extend(self.drain())`, but has the From 9afa93491d8e2b8969334832b2bddd3da2bde2ed Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 18 Jan 2021 09:03:45 -0500 Subject: [PATCH 2/3] remove commented code, improve comments --- timely/src/progress/broadcast.rs | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index e0ba23347..4e435f74d 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -58,18 +58,6 @@ impl Progcaster { changes.compact(); if !changes.is_empty() { - // self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - // is_send: true, - // source: self.source, - // channel: self.channel_identifier, - // seq_no: self.counter, - // addr: self.addr.clone(), - // // TODO: fill with additional data - // messages: Vec::new(), - // internal: Vec::new(), - // })); - - // This logging is relatively more expensive than other logging, as we // have formatting and string allocations on the main path. We do have // local type information about the timestamp, and we could log *that* @@ -148,18 +136,8 @@ impl Progcaster { let addr = &mut self.addr; let channel = self.channel_identifier; - - // self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - // is_send: false, - // source: source, - // seq_no: counter, - // channel, - // addr: addr.clone(), - // // TODO: fill with additional data - // messages: Vec::new(), - // internal: Vec::new(), - // })); - + // See comments above about the relatively high cost of this logging, and our + // options for improving it if performance limits users who want other logging. self.logging.as_ref().map(|l| { let mut messages = Vec::with_capacity(changes.len()); From 009a178dd5fe34e609f7bd6dc96feff891250c1b Mon Sep 17 00:00:00 2001 From: Andrea Lattuada Date: Fri, 22 Jan 2021 02:44:21 +0100 Subject: [PATCH 3/3] Separate the progress logging stream, use dyn trait instead of String for timestamps (#353) * Separate the progress logging stream, use dyn trait instead of String for timestamps * Remove the serialization machinery from progress logging, provide dynamic type information instead * Add example for progress logging * Always box logging progress vectors on construction * Explain why we need the `ProgressEventTimestampVec` trait --- timely/examples/logging-send.rs | 22 ++++++++- timely/src/dataflow/scopes/child.rs | 9 +++- timely/src/logging.rs | 70 ++++++++++++++++++++++++----- timely/src/progress/broadcast.rs | 44 ++++++++---------- timely/src/progress/subgraph.rs | 8 +++- timely/src/worker.rs | 4 +- 6 files changed, 116 insertions(+), 41 deletions(-) diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 93e6a8867..23228f4e8 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -5,7 +5,7 @@ use timely::dataflow::operators::{Input, Exchange, Probe}; // use timely::dataflow::operators::capture::EventWriter; // use timely::dataflow::ScopeParent; -use timely::logging::TimelyEvent; +use timely::logging::{TimelyEvent, TimelyProgressEvent}; fn main() { // initializes and runs a timely dataflow. @@ -21,6 +21,26 @@ fn main() { data.iter().for_each(|x| println!("LOG1: {:?}", x)) ); + // Register timely progress logging. + // Less generally useful: intended for debugging advanced custom operators or timely + // internals. + worker.log_register().insert::("timely/progress", |_time, data| + data.iter().for_each(|x| { + println!("PROGRESS: {:?}", x); + let (_, _, ev) = x; + print!("PROGRESS: TYPED MESSAGES: "); + for (n, p, t, d) in ev.messages.iter() { + print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + } + println!(); + print!("PROGRESS: TYPED INTERNAL: "); + for (n, p, t, d) in ev.internal.iter() { + print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + } + println!(); + }) + ); + // create a new input, exchange data, and inspect its output worker.dataflow(|scope| { scope diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 824490376..98f255e07 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -12,6 +12,7 @@ use crate::progress::{Source, Target}; use crate::progress::timestamp::Refines; use crate::order::Product; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::worker::{AsWorker, Config}; use super::{ScopeParent, Scope}; @@ -32,6 +33,8 @@ where pub parent: G, /// The log writer for this scope. pub logging: Option, + /// The progress log writer for this scope. + pub progress_logging: Option, } impl<'a, G, T> Child<'a, G, T> @@ -115,12 +118,13 @@ where let index = self.subgraph.borrow_mut().allocate_child_id(); let path = self.subgraph.borrow().path.clone(); - let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), name)); + let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name)); let result = { let mut builder = Child { subgraph: &subscope, parent: self.clone(), logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), }; func(&mut builder) }; @@ -143,7 +147,8 @@ where Child { subgraph: self.subgraph, parent: self.parent.clone(), - logging: self.logging.clone() + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), } } } diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ac0316304..0be7f98d3 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -6,6 +6,8 @@ pub type WorkerIdentifier = usize; pub type Logger = crate::logging_core::Logger; /// Logger for timely dataflow system events. pub type TimelyLogger = Logger; +/// Logger for timely dataflow progress events (the "timely/progress" log stream). +pub type TimelyProgressLogger = Logger; use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; @@ -70,9 +72,63 @@ pub struct ChannelsEvent { pub target: (usize, usize), } -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +/// Encapsulates Any and Debug for dynamically typed timestamps in logs +pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { + /// Upcasts this `ProgressEventTimestamp` to `Any`. + /// + /// NOTE: This is required until https://github.com/rust-lang/rfcs/issues/2765 is fixed + /// + /// # Example + /// ```rust + /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; + /// let ts: &timely::logging::ProgressEventTimestampVec = &ts; + /// for (n, p, t, d) in ts.iter() { + /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); + /// } + /// println!(); + /// ``` + fn as_any(&self) -> &dyn std::any::Any; + + /// Returns the name of the concrete type of this object. + /// + /// # Note + /// + /// This is intended for diagnostic use. The exact contents and format of the + /// string returned are not specified, other than being a best-effort + /// description of the type. For example, amongst the strings + /// that `type_name::>()` might return are `"Option"` and + /// `"std::option::Option"`. + fn type_name(&self) -> &'static str; +} +impl ProgressEventTimestamp for T { + fn as_any(&self) -> &dyn std::any::Any { self } + + fn type_name(&self) -> &'static str { std::any::type_name::() } +} + +/// A vector of progress updates in logs +/// +/// This exists to support upcasting of the concrecte progress update vectors to +/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to +/// use a single allocation for the entire vector (as opposed to a `Box` allocation +/// for each dynamically typed element). +pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { + /// Iterate over the contents of the vector + fn iter<'a>(&'a self) -> Box+'a>; +} + +impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { + fn iter<'a>(&'a self) -> Box+'a> { + Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { + let t: &dyn ProgressEventTimestamp = t; + (n, p, t, d) + })) + } +} + +#[derive(Debug)] /// Send or receive of progress information. -pub struct ProgressEvent { +pub struct TimelyProgressEvent { /// `true` if the event is a send, and `false` if it is a receive. pub is_send: bool, /// Source worker index. @@ -84,9 +140,9 @@ pub struct ProgressEvent { /// Sequence of nested scope identifiers indicating the path from the root to this instance. pub addr: Vec, /// List of message updates, containing Target descriptor, timestamp as string, and delta. - pub messages: Vec<(usize, usize, String, i64)>, + pub messages: Box, /// List of capability updates, containing Source descriptor, timestamp as string, and delta. - pub internal: Vec<(usize, usize, String, i64)>, + pub internal: Box, } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] @@ -225,8 +281,6 @@ pub enum TimelyEvent { Operates(OperatesEvent), /// Channel creation. Channels(ChannelsEvent), - /// Progress message send or receive. - Progress(ProgressEvent), /// Progress propagation (reasoning). PushProgress(PushProgressEvent), /// Message send or receive. @@ -259,10 +313,6 @@ impl From for TimelyEvent { fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) } } -impl From for TimelyEvent { - fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) } -} - impl From for TimelyEvent { fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 4e435f74d..5eead8b92 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -4,6 +4,7 @@ use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::{Location, Port}; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; /// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)` pub type ProgressVec = Vec<((Location, T), i64)>; @@ -25,12 +26,12 @@ pub struct Progcaster { /// Communication channel identifier channel_identifier: usize, - logging: Option, + progress_logging: Option, } impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, path: &Vec, mut logging: Option) -> Progcaster { + pub fn new(worker: &mut A, path: &Vec, mut logging: Option, progress_logging: Option) -> Progcaster { let channel_identifier = worker.new_identifier(); let (pushers, puller) = worker.allocate(channel_identifier, &path[..]); @@ -48,7 +49,7 @@ impl Progcaster { counter: 0, addr, channel_identifier, - logging, + progress_logging, } } @@ -58,34 +59,26 @@ impl Progcaster { changes.compact(); if !changes.is_empty() { - // This logging is relatively more expensive than other logging, as we - // have formatting and string allocations on the main path. We do have - // local type information about the timestamp, and we could log *that* - // explicitly, but the consumer would have to know what to look for and - // interpret appropriately. That's a big ask, so let's start with this, - // and as folks need more performant logging think about allowing users - // to select the more efficient variant. - self.logging.as_ref().map(|l| { + self.progress_logging.as_ref().map(|l| { // Pre-allocate enough space; we transfer ownership, so there is not // an apportunity to re-use allocations (w/o changing the logging // interface to accept references). - let mut messages = Vec::with_capacity(changes.len()); - let mut internal = Vec::with_capacity(changes.len()); + let mut messages = Box::new(Vec::with_capacity(changes.len())); + let mut internal = Box::new(Vec::with_capacity(changes.len())); - // TODO: Reconsider `String` type or perhaps re-use allocation. for ((location, time), diff) in changes.iter() { match location.port { Port::Target(port) => { - messages.push((location.node, port, format!("{:?}", time), *diff)) + messages.push((location.node, port, time.clone(), *diff)) }, Port::Source(port) => { - internal.push((location.node, port, format!("{:?}", time), *diff)) + internal.push((location.node, port, time.clone(), *diff)) } } } - l.log(crate::logging::ProgressEvent { + l.log(crate::logging::TimelyProgressEvent { is_send: true, source: self.source, channel: self.channel_identifier, @@ -138,32 +131,31 @@ impl Progcaster { // See comments above about the relatively high cost of this logging, and our // options for improving it if performance limits users who want other logging. - self.logging.as_ref().map(|l| { + self.progress_logging.as_ref().map(|l| { - let mut messages = Vec::with_capacity(changes.len()); - let mut internal = Vec::with_capacity(changes.len()); + let mut messages = Box::new(Vec::with_capacity(changes.len())); + let mut internal = Box::new(Vec::with_capacity(changes.len())); - // TODO: Reconsider `String` type or perhaps re-use allocation. for ((location, time), diff) in recv_changes.iter() { match location.port { Port::Target(port) => { - messages.push((location.node, port, format!("{:?}", time), *diff)) + messages.push((location.node, port, time.clone(), *diff)) }, Port::Source(port) => { - internal.push((location.node, port, format!("{:?}", time), *diff)) + internal.push((location.node, port, time.clone(), *diff)) } } } - l.log(crate::logging::ProgressEvent { + l.log(crate::logging::TimelyProgressEvent { is_send: false, source: source, seq_no: counter, channel, addr: addr.clone(), - messages, - internal, + messages: messages, + internal: internal, }); }); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index bf571d2da..2babf5d27 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -11,6 +11,7 @@ use std::collections::BinaryHeap; use std::cmp::Reverse; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; @@ -63,6 +64,9 @@ where /// Logging handle logging: Option, + + /// Progress logging handle + progress_logging: Option, } impl SubgraphBuilder @@ -95,6 +99,7 @@ where index: usize, mut path: Vec, logging: Option, + progress_logging: Option, name: &str, ) -> SubgraphBuilder @@ -114,6 +119,7 @@ where input_messages: Vec::new(), output_capabilities: Vec::new(), logging, + progress_logging, } } @@ -169,7 +175,7 @@ where let (tracker, scope_summary) = builder.build(); - let progcaster = Progcaster::new(worker, &self.path, self.logging.clone()); + let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone()); let mut incomplete = vec![true; self.children.len()]; incomplete[0] = false; diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 93e80c596..62196b970 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -570,7 +570,8 @@ impl Worker { let dataflow_index = self.allocate_dataflow_index(); let identifier = self.new_identifier(); - let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), name); + let progress_logging = self.logging.borrow_mut().get("timely/progress"); + let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name); let subscope = RefCell::new(subscope); let result = { @@ -578,6 +579,7 @@ impl Worker { subgraph: &subscope, parent: self.clone(), logging: logging.clone(), + progress_logging: progress_logging.clone(), }; func(&mut resources, &mut builder) };