From f36920425bedf5eeedb38d2db27d83293f18b361 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 13 Jun 2024 22:17:09 -0400 Subject: [PATCH 1/2] Clone for Config and other improvements * Clone for timely and communication config. Forces the cluster log function to be wrapped in an Arc, which is what happens anyways, so a no-op change. * Use `execute_from` in `execute` to deduplicate code. * Document that `execute` will to weird things with the network based on undocumented env symbols. Signed-off-by: Moritz Hoffmann --- .../src/allocator/zero_copy/initialize.rs | 7 +++-- communication/src/initialize.rs | 5 ++-- timely/src/execute.rs | 29 ++++++++++--------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index be1494fa4..87ad49571 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -39,7 +39,8 @@ pub fn initialize_networking( my_index: usize, threads: usize, noisy: bool, - log_sender: BoxOption>+Send+Sync>) + log_sender: ArcOption>+Send+Sync>>, +) -> ::std::io::Result<(Vec>, CommsGuard)> { let sockets = create_sockets(addresses, my_index, noisy)?; @@ -57,7 +58,8 @@ pub fn initialize_networking_from_sockets( mut sockets: Vec>, my_index: usize, threads: usize, - log_sender: BoxOption>+Send+Sync>) + log_sender: ArcOption>+Send+Sync>>, +) -> ::std::io::Result<(Vec>, CommsGuard)> { // Sockets are expected to be blocking, @@ -67,7 +69,6 @@ pub fn initialize_networking_from_sockets( } } - let log_sender = Arc::new(log_sender); let processes = sockets.len(); let process_allocators = crate::allocator::process::Process::new_vector(threads); diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 3c76624fc..31e8db4ea 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter}; /// Possible configurations for the communication infrastructure. +#[derive(Clone)] pub enum Config { /// Use one thread. Thread, @@ -38,7 +39,7 @@ pub enum Config { /// Verbosely report connection process report: bool, /// Closure to create a new logger for a communication thread - log_fn: Box Option> + Send + Sync>, + log_fn: Arc Option> + Send + Sync>>, } } @@ -120,7 +121,7 @@ impl Config { process, addresses, report, - log_fn: Box::new( | _ | None), + log_fn: Arc::new(Box::new( | _ | None)), }) } else if threads > 1 { if zerocopy { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 9a8340dbf..65370a73a 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -6,6 +6,7 @@ use crate::worker::Worker; use crate::{CommunicationConfig, WorkerConfig}; /// Configures the execution of a timely dataflow computation. +#[derive(Clone, Debug)] pub struct Config { /// Configuration for the communication infrastructure. pub communication: CommunicationConfig, @@ -218,6 +219,17 @@ where /// // the extracted data should have data (0..10) thrice at timestamp 0. /// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::>()); /// ``` +/// +/// ### Communication logging +/// +/// For multi-process (cluster) configurations, this functions installs a custom log +/// function, replacing any previous function. The function connects to the host in +/// the environment symbol `TIMELY_COMM_LOG_ADDR`. +/// +/// ### Timely logging +/// +/// If the environment symbol `TIMELY_WORKER_LOG_ADDR` is set, each worker will try to +/// connect to this address to send its worker logs. pub fn execute( mut config: Config, func: F @@ -228,7 +240,7 @@ where if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication { - *log_fn = Box::new(|events_setup| { + *log_fn = std::sync::Arc::new(Box::new(|events_setup| { let mut result = None; if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") { @@ -254,16 +266,12 @@ where } } result - }); + })); } let (allocators, other) = config.communication.try_build()?; - let worker_config = config.worker; - initialize_from(allocators, other, move |allocator| { - - let mut worker = Worker::new(worker_config.clone(), allocator); - + execute_from(allocators, other, config.worker, move |worker| { // If an environment variable is set, use it as the default timely logging. if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") { @@ -283,12 +291,7 @@ where panic!("Could not connect logging stream to: {:?}", addr); } } - - let result = func(&mut worker); - while worker.has_dataflows() { - worker.step_or_park(None); - } - result + func(worker) }) } From c1d668bd9f50d775a35a9a3e4b7e4c243f547505 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 13 Jun 2024 22:48:50 -0400 Subject: [PATCH 2/2] Address comments Signed-off-by: Moritz Hoffmann --- .../src/allocator/zero_copy/initialize.rs | 4 +- communication/src/initialize.rs | 4 +- timely/src/execute.rs | 75 +------------------ 3 files changed, 8 insertions(+), 75 deletions(-) diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 87ad49571..54808bab2 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -39,7 +39,7 @@ pub fn initialize_networking( my_index: usize, threads: usize, noisy: bool, - log_sender: ArcOption>+Send+Sync>>, + log_sender: ArcOption>+Send+Sync>, ) -> ::std::io::Result<(Vec>, CommsGuard)> { @@ -58,7 +58,7 @@ pub fn initialize_networking_from_sockets( mut sockets: Vec>, my_index: usize, threads: usize, - log_sender: ArcOption>+Send+Sync>>, + log_sender: ArcOption>+Send+Sync>, ) -> ::std::io::Result<(Vec>, CommsGuard)> { diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 31e8db4ea..f236b9c02 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -39,7 +39,7 @@ pub enum Config { /// Verbosely report connection process report: bool, /// Closure to create a new logger for a communication thread - log_fn: Arc Option> + Send + Sync>>, + log_fn: Arc Option> + Send + Sync>, } } @@ -121,7 +121,7 @@ impl Config { process, addresses, report, - log_fn: Arc::new(Box::new( | _ | None)), + log_fn: Arc::new(|_| None), }) } else if threads > 1 { if zerocopy { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 65370a73a..8e173b518 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -219,80 +219,13 @@ where /// // the extracted data should have data (0..10) thrice at timestamp 0. /// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::>()); /// ``` -/// -/// ### Communication logging -/// -/// For multi-process (cluster) configurations, this functions installs a custom log -/// function, replacing any previous function. The function connects to the host in -/// the environment symbol `TIMELY_COMM_LOG_ADDR`. -/// -/// ### Timely logging -/// -/// If the environment symbol `TIMELY_WORKER_LOG_ADDR` is set, each worker will try to -/// connect to this address to send its worker logs. -pub fn execute( - mut config: Config, - func: F -) -> Result,String> +pub fn execute(config: Config, func: F) -> Result,String> where T:Send+'static, - F: Fn(&mut Worker)->T+Send+Sync+'static { - - if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication { - - *log_fn = std::sync::Arc::new(Box::new(|events_setup| { - - let mut result = None; - if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") { - - use ::std::net::TcpStream; - use crate::logging::BatchLogger; - use crate::dataflow::operators::capture::EventWriter; - - eprintln!("enabled COMM logging to {}", addr); - - if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); - let mut logger = BatchLogger::new(writer); - result = Some(crate::logging_core::Logger::new( - ::std::time::Instant::now(), - ::std::time::Duration::default(), - events_setup, - move |time, data| logger.publish_batch(time, data) - )); - } - else { - panic!("Could not connect to communication log address: {:?}", addr); - } - } - result - })); - } - + F: Fn(&mut Worker)->T+Send+Sync+'static, +{ let (allocators, other) = config.communication.try_build()?; - - execute_from(allocators, other, config.worker, move |worker| { - // If an environment variable is set, use it as the default timely logging. - if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") { - - use ::std::net::TcpStream; - use crate::logging::{BatchLogger, TimelyEvent}; - use crate::dataflow::operators::capture::EventWriter; - - if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); - let mut logger = BatchLogger::new(writer); - worker.log_register() - .insert::("timely", move |time, data| - logger.publish_batch(time, data) - ); - } - else { - panic!("Could not connect logging stream to: {:?}", addr); - } - } - func(worker) - }) + execute_from(allocators, other, config.worker, func) } /// Executes a timely dataflow from supplied arguments and per-communicator logic.