Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The address associated with each operator, a `[usize]` used to start with the id

The `Worker` and the `Subgraph` operator no longer schedules all of their child dataflows and scopes by default. Instead, they track "active" children and schedule only those. Operators become active by receiving a message, a progress update, or by explicit activation. Some operators, source as `source`, have no inputs and will require explicit activation to run more than once. Operators that yield before completing all of their work (good for you!) should explicitly re-activate themselves to ensure they are re-scheduled even if they receive no further messages or progress updates. Documentation examples for the `source` method demonstrate this.

The `dataflow_using` method has been generalized to support arbitrary dataflow names, loggers, and additional resources the dataflow should keep alive. Its name has been chaged to `dataflow_core`.
The `dataflow_using` method has been generalized to support arbitrary dataflow names, loggers, and additional resources the dataflow should keep alive. Its name has been changed to `dataflow_core`.

You can now construct `feedback` operators with a `Default::default()` path summary, which has the ability to not increment timestamps. Instead of panicking, Timely's reachability module will inform you if a non-incrementing cycle is detected, at which point you should probably double check your code. It is not 100% known what the system will do in this case (e.g., the progress tracker may enter a non-terminating loop; this is on you, not us ;)).

Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bytes::arc::Bytes;
pub struct BytesSlab {
buffer: Bytes, // current working buffer.
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
stash: Vec<Bytes>, // reclaimed and resuable buffers.
stash: Vec<Bytes>, // reclaimed and reusable buffers.
shift: usize, // current buffer allocation size.
valid: usize, // buffer[..valid] are valid bytes.
}
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
// At the start of each iteration, `self.buffer[..self.length]` represents valid
// data, and the remaining capacity is available for reading from the reader.
//
// Once the buffer fills, we need to copy uncomplete messages to a new shared
// Once the buffer fills, we need to copy incomplete messages to a new shared
// allocation and place the existing Bytes into `self.in_progress`, so that it
// can be recovered once all readers have read what they need to.
let mut active = true;
Expand Down
2 changes: 1 addition & 1 deletion communication/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<T: Data> Message<T> {
impl<T> ::std::ops::Deref for Message<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
// TODO: In principle we have aready decoded, but let's go again
// TODO: In principle we have already decoded, but let's go again
match &self.payload {
MessageContents::Owned(typed) => { typed },
MessageContents::Arc(typed) => { typed },
Expand Down
2 changes: 1 addition & 1 deletion container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub trait ContainerBuilder: Default + 'static {
pub struct CapacityContainerBuilder<C>{
/// Container that we're writing to.
current: C,
/// Emtpy allocation.
/// Empty allocation.
empty: Option<C>,
/// Completed containers pending to be sent.
pending: VecDeque<C>,
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/loopdemo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();

// Create a dataflow that discards input data (just syncronizes).
// Create a dataflow that discards input data (just synchronizes).
worker.dataflow(|scope| {

let stream = scope.input_from(&mut input);
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/openloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();

// Create a dataflow that discards input data (just syncronizes).
// Create a dataflow that discards input data (just synchronizes).
worker.dataflow(|scope| {
scope
.input_from(&mut input) // read input.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
self.progress_logging.as_ref().map(|l| {

// Pre-allocate enough space; we transfer ownership, so there is not
// an apportunity to re-use allocations (w/o changing the logging
// an opportunity to re-use allocations (w/o changing the logging
// interface to accept references).
let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where
/// Drains the set of updates.
///
/// This operation first compacts the set of updates so that the drained results
/// have at most one occurence of each item.
/// have at most one occurrence of each item.
///
/// # Examples
///
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ where
let inputs = self.input_messages.len();
let outputs = self.output_capabilities.len();

// Create empty child zero represenative.
// Create empty child zero representative.
self.children[0] = PerOperatorState::empty(outputs, inputs);

let mut builder = reachability::Builder::new();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/synchronization/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<T: ExchangeData> Sequencer<T> {
// a source that attempts to pull from `recv` and produce commands for everyone
source(dataflow, "SequenceInput", move |capability, info| {

// intialize activator, now that we have the address
// initialize activator, now that we have the address
activator_source
.borrow_mut()
.replace(CatchupActivator {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub trait AsWorker : Scheduler {
/// Constructs a pipeline channel from the worker to itself.
///
/// By default this method uses the native channel allocation mechanism, but the expectation is
/// that this behavior will be overriden to be more efficient.
/// that this behavior will be overridden to be more efficient.
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);

/// Allocates a new worker-unique identifier.
Expand Down