diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index be1494fa4..54808bab2 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -39,7 +39,8 @@ pub fn initialize_networking( my_index: usize, threads: usize, noisy: bool, - log_sender: BoxOption>+Send+Sync>) + log_sender: ArcOption>+Send+Sync>, +) -> ::std::io::Result<(Vec>, CommsGuard)> { let sockets = create_sockets(addresses, my_index, noisy)?; @@ -57,7 +58,8 @@ pub fn initialize_networking_from_sockets( mut sockets: Vec>, my_index: usize, threads: usize, - log_sender: BoxOption>+Send+Sync>) + log_sender: ArcOption>+Send+Sync>, +) -> ::std::io::Result<(Vec>, CommsGuard)> { // Sockets are expected to be blocking, @@ -67,7 +69,6 @@ pub fn initialize_networking_from_sockets( } } - let log_sender = Arc::new(log_sender); let processes = sockets.len(); let process_allocators = crate::allocator::process::Process::new_vector(threads); diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 3c76624fc..f236b9c02 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter}; /// Possible configurations for the communication infrastructure. +#[derive(Clone)] pub enum Config { /// Use one thread. Thread, @@ -38,7 +39,7 @@ pub enum Config { /// Verbosely report connection process report: bool, /// Closure to create a new logger for a communication thread - log_fn: Box Option> + Send + Sync>, + log_fn: Arc Option> + Send + Sync>, } } @@ -120,7 +121,7 @@ impl Config { process, addresses, report, - log_fn: Box::new( | _ | None), + log_fn: Arc::new(|_| None), }) } else if threads > 1 { if zerocopy { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 9a8340dbf..8e173b518 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -6,6 +6,7 @@ use crate::worker::Worker; use crate::{CommunicationConfig, WorkerConfig}; /// Configures the execution of a timely dataflow computation. +#[derive(Clone, Debug)] pub struct Config { /// Configuration for the communication infrastructure. pub communication: CommunicationConfig, @@ -218,78 +219,13 @@ where /// // the extracted data should have data (0..10) thrice at timestamp 0. /// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::>()); /// ``` -pub fn execute( - mut config: Config, - func: F -) -> Result,String> +pub fn execute(config: Config, func: F) -> Result,String> where T:Send+'static, - F: Fn(&mut Worker)->T+Send+Sync+'static { - - if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication { - - *log_fn = Box::new(|events_setup| { - - let mut result = None; - if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") { - - use ::std::net::TcpStream; - use crate::logging::BatchLogger; - use crate::dataflow::operators::capture::EventWriter; - - eprintln!("enabled COMM logging to {}", addr); - - if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); - let mut logger = BatchLogger::new(writer); - result = Some(crate::logging_core::Logger::new( - ::std::time::Instant::now(), - ::std::time::Duration::default(), - events_setup, - move |time, data| logger.publish_batch(time, data) - )); - } - else { - panic!("Could not connect to communication log address: {:?}", addr); - } - } - result - }); - } - + F: Fn(&mut Worker)->T+Send+Sync+'static, +{ let (allocators, other) = config.communication.try_build()?; - - let worker_config = config.worker; - initialize_from(allocators, other, move |allocator| { - - let mut worker = Worker::new(worker_config.clone(), allocator); - - // If an environment variable is set, use it as the default timely logging. - if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") { - - use ::std::net::TcpStream; - use crate::logging::{BatchLogger, TimelyEvent}; - use crate::dataflow::operators::capture::EventWriter; - - if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); - let mut logger = BatchLogger::new(writer); - worker.log_register() - .insert::("timely", move |time, data| - logger.publish_batch(time, data) - ); - } - else { - panic!("Could not connect logging stream to: {:?}", addr); - } - } - - let result = func(&mut worker); - while worker.has_dataflows() { - worker.step_or_park(None); - } - result - }) + execute_from(allocators, other, config.worker, func) } /// Executes a timely dataflow from supplied arguments and per-communicator logic.