Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use timely::dataflow::operators::{Input, Exchange, Probe};

// use timely::dataflow::operators::capture::EventWriter;
// use timely::dataflow::ScopeParent;
use timely::logging::TimelyEvent;
use timely::logging::{TimelyEvent, TimelyProgressEvent};

fn main() {
// initializes and runs a timely dataflow.
Expand All @@ -21,6 +21,26 @@ fn main() {
data.iter().for_each(|x| println!("LOG1: {:?}", x))
);

// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, _, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
})
);

// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope
Expand Down
9 changes: 7 additions & 2 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::progress::{Source, Target};
use crate::progress::timestamp::Refines;
use crate::order::Product;
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::worker::{AsWorker, Config};

use super::{ScopeParent, Scope};
Expand All @@ -32,6 +33,8 @@ where
pub parent: G,
/// The log writer for this scope.
pub logging: Option<Logger>,
/// The progress log writer for this scope.
pub progress_logging: Option<ProgressLogger>,
}

impl<'a, G, T> Child<'a, G, T>
Expand Down Expand Up @@ -115,12 +118,13 @@ where
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();

let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), name));
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name));
let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
};
func(&mut builder)
};
Expand All @@ -143,7 +147,8 @@ where
Child {
subgraph: self.subgraph,
parent: self.parent.clone(),
logging: self.logging.clone()
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
}
}
}
70 changes: 60 additions & 10 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub type WorkerIdentifier = usize;
pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
/// Logger for timely dataflow system events.
pub type TimelyLogger = Logger<TimelyEvent>;
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;

use std::time::Duration;
use crate::dataflow::operators::capture::{Event, EventPusher};
Expand Down Expand Up @@ -70,9 +72,63 @@ pub struct ChannelsEvent {
pub target: (usize, usize),
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Encapsulates Any and Debug for dynamically typed timestamps in logs
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
/// Upcasts this `ProgressEventTimestamp` to `Any`.
///
/// NOTE: This is required until https://github.com/rust-lang/rfcs/issues/2765 is fixed
///
/// # Example
/// ```rust
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
/// let ts: &timely::logging::ProgressEventTimestampVec = &ts;
/// for (n, p, t, d) in ts.iter() {
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
/// }
/// println!();
/// ```
fn as_any(&self) -> &dyn std::any::Any;

/// Returns the name of the concrete type of this object.
///
/// # Note
///
/// This is intended for diagnostic use. The exact contents and format of the
/// string returned are not specified, other than being a best-effort
/// description of the type. For example, amongst the strings
/// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
/// `"std::option::Option<std::string::String>"`.
fn type_name(&self) -> &'static str;
}
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
fn as_any(&self) -> &dyn std::any::Any { self }

fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
}

/// A vector of progress updates in logs
///
/// This exists to support upcasting of the concrecte progress update vectors to
/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
/// use a single allocation for the entire vector (as opposed to a `Box` allocation
/// for each dynamically typed element).
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
/// Iterate over the contents of the vector
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
}

impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
let t: &dyn ProgressEventTimestamp = t;
(n, p, t, d)
}))
}
}

#[derive(Debug)]
/// Send or receive of progress information.
pub struct ProgressEvent {
pub struct TimelyProgressEvent {
/// `true` if the event is a send, and `false` if it is a receive.
pub is_send: bool,
/// Source worker index.
Expand All @@ -84,9 +140,9 @@ pub struct ProgressEvent {
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
pub addr: Vec<usize>,
/// List of message updates, containing Target descriptor, timestamp as string, and delta.
pub messages: Vec<(usize, usize, String, i64)>,
pub messages: Box<dyn ProgressEventTimestampVec>,
/// List of capability updates, containing Source descriptor, timestamp as string, and delta.
pub internal: Vec<(usize, usize, String, i64)>,
pub internal: Box<dyn ProgressEventTimestampVec>,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
Expand Down Expand Up @@ -225,8 +281,6 @@ pub enum TimelyEvent {
Operates(OperatesEvent),
/// Channel creation.
Channels(ChannelsEvent),
/// Progress message send or receive.
Progress(ProgressEvent),
/// Progress propagation (reasoning).
PushProgress(PushProgressEvent),
/// Message send or receive.
Expand Down Expand Up @@ -259,10 +313,6 @@ impl From<ChannelsEvent> for TimelyEvent {
fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
}

impl From<ProgressEvent> for TimelyEvent {
fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) }
}

impl From<PushProgressEvent> for TimelyEvent {
fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
}
Expand Down
88 changes: 64 additions & 24 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Broadcasts progress information among workers.

use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::Location;
use crate::progress::{Location, Port};
use crate::communication::{Message, Push, Pull};
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;

/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
pub type ProgressVec<T> = Vec<((Location, T), i64)>;
Expand All @@ -25,12 +26,12 @@ pub struct Progcaster<T:Timestamp> {
/// Communication channel identifier
channel_identifier: usize,

logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
}

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
Expand All @@ -48,7 +49,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
counter: 0,
addr,
channel_identifier,
logging,
progress_logging,
}
}

Expand All @@ -58,16 +59,35 @@ impl<T:Timestamp+Send> Progcaster<T> {
changes.compact();
if !changes.is_empty() {

self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent {
is_send: true,
source: self.source,
channel: self.channel_identifier,
seq_no: self.counter,
addr: self.addr.clone(),
// TODO: fill with additional data
messages: Vec::new(),
internal: Vec::new(),
}));
self.progress_logging.as_ref().map(|l| {

// Pre-allocate enough space; we transfer ownership, so there is not
// an apportunity to re-use allocations (w/o changing the logging
// interface to accept references).
let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));

for ((location, time), diff) in changes.iter() {
match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
}
}

l.log(crate::logging::TimelyProgressEvent {
is_send: true,
source: self.source,
channel: self.channel_identifier,
seq_no: self.counter,
addr: self.addr.clone(),
messages,
internal,
});
});

for pusher in self.pushers.iter_mut() {

Expand Down Expand Up @@ -108,16 +128,36 @@ impl<T:Timestamp+Send> Progcaster<T> {

let addr = &mut self.addr;
let channel = self.channel_identifier;
self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent {
is_send: false,
source: source,
seq_no: counter,
channel,
addr: addr.clone(),
// TODO: fill with additional data
messages: Vec::new(),
internal: Vec::new(),
}));

// See comments above about the relatively high cost of this logging, and our
// options for improving it if performance limits users who want other logging.
self.progress_logging.as_ref().map(|l| {

let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));

for ((location, time), diff) in recv_changes.iter() {

match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
}
}

l.log(crate::logging::TimelyProgressEvent {
is_send: false,
source: source,
seq_no: counter,
channel,
addr: addr.clone(),
messages: messages,
internal: internal,
});
});

// We clone rather than drain to avoid deserialization.
for &(ref update, delta) in recv_changes.iter() {
Expand Down
21 changes: 21 additions & 0 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,27 @@ impl<T:Ord> ChangeBatch<T> {
}
}

/// Number of compacted updates.
///
/// This method requires mutable access to `self` because it may need to compact the
/// representation to determine the number of actual updates.
///
/// # Examples
///
///```
/// use timely::progress::ChangeBatch;
///
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
/// batch.update(17, -1);
/// batch.update(14, -1);
/// assert_eq!(batch.len(), 1);
///```
#[inline]
pub fn len(&mut self) -> usize {
self.compact();
self.updates.len()
}

/// Drains `self` into `other`.
///
/// This method has similar a effect to calling `other.extend(self.drain())`, but has the
Expand Down
8 changes: 7 additions & 1 deletion timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::BinaryHeap;
use std::cmp::Reverse;

use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;

use crate::scheduling::Schedule;
use crate::scheduling::activate::Activations;
Expand Down Expand Up @@ -63,6 +64,9 @@ where

/// Logging handle
logging: Option<Logger>,

/// Progress logging handle
progress_logging: Option<ProgressLogger>,
}

impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
Expand Down Expand Up @@ -95,6 +99,7 @@ where
index: usize,
mut path: Vec<usize>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
Expand All @@ -114,6 +119,7 @@ where
input_messages: Vec::new(),
output_capabilities: Vec::new(),
logging,
progress_logging,
}
}

Expand Down Expand Up @@ -169,7 +175,7 @@ where

let (tracker, scope_summary) = builder.build();

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone());
let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());

let mut incomplete = vec![true; self.children.len()];
incomplete[0] = false;
Expand Down
Loading