Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pub fn initialize_networking(
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
let sockets = create_sockets(addresses, my_index, noisy)?;
Expand All @@ -57,7 +58,8 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
// Sockets are expected to be blocking,
Expand All @@ -67,7 +69,6 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
}
}

let log_sender = Arc::new(log_sender);
let processes = sockets.len();

let process_allocators = crate::allocator::process::Process::new_vector(threads);
Expand Down
5 changes: 3 additions & 2 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter};


/// Possible configurations for the communication infrastructure.
#[derive(Clone)]
pub enum Config {
/// Use one thread.
Thread,
Expand All @@ -38,7 +39,7 @@ pub enum Config {
/// Verbosely report connection process
report: bool,
/// Closure to create a new logger for a communication thread
log_fn: Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
}
}

Expand Down Expand Up @@ -120,7 +121,7 @@ impl Config {
process,
addresses,
report,
log_fn: Box::new( | _ | None),
log_fn: Arc::new(|_| None),
})
} else if threads > 1 {
if zerocopy {
Expand Down
74 changes: 5 additions & 69 deletions timely/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::worker::Worker;
use crate::{CommunicationConfig, WorkerConfig};

/// Configures the execution of a timely dataflow computation.
#[derive(Clone, Debug)]
pub struct Config {
/// Configuration for the communication infrastructure.
pub communication: CommunicationConfig,
Expand Down Expand Up @@ -218,78 +219,13 @@ where
/// // the extracted data should have data (0..10) thrice at timestamp 0.
/// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
/// ```
pub fn execute<T, F>(
mut config: Config,
func: F
) -> Result<WorkerGuards<T>,String>
pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>,String>
where
T:Send+'static,
F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static {

if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication {

*log_fn = Box::new(|events_setup| {

let mut result = None;
if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") {

use ::std::net::TcpStream;
use crate::logging::BatchLogger;
use crate::dataflow::operators::capture::EventWriter;

eprintln!("enabled COMM logging to {}", addr);

if let Ok(stream) = TcpStream::connect(&addr) {
let writer = EventWriter::new(stream);
let mut logger = BatchLogger::new(writer);
result = Some(crate::logging_core::Logger::new(
::std::time::Instant::now(),
::std::time::Duration::default(),
events_setup,
move |time, data| logger.publish_batch(time, data)
));
}
else {
panic!("Could not connect to communication log address: {:?}", addr);
}
}
result
});
}

F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static,
{
let (allocators, other) = config.communication.try_build()?;

let worker_config = config.worker;
initialize_from(allocators, other, move |allocator| {

let mut worker = Worker::new(worker_config.clone(), allocator);

// If an environment variable is set, use it as the default timely logging.
if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") {

use ::std::net::TcpStream;
use crate::logging::{BatchLogger, TimelyEvent};
use crate::dataflow::operators::capture::EventWriter;

if let Ok(stream) = TcpStream::connect(&addr) {
let writer = EventWriter::new(stream);
let mut logger = BatchLogger::new(writer);
worker.log_register()
.insert::<TimelyEvent,_>("timely", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect logging stream to: {:?}", addr);
}
}

let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
}
result
})
execute_from(allocators, other, config.worker, func)
}

/// Executes a timely dataflow from supplied arguments and per-communicator logic.
Expand Down