Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Update to latest Timely (TimelyDataflow#519)
Browse files Browse the repository at this point in the history
* Update to latest Timely

Bring in changes related to abomination and reference-counted addresses.

Signed-off-by: Moritz Hoffmann <[email protected]>

* Serde for batch

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Sep 9, 2024
1 parent 7998e65 commit 9376560
Show file tree
Hide file tree
Showing 25 changed files with 76 additions and 200 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ graph_map = "0.1"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
abomonation = "0.7"
abomonation_derive = "0.5"
fnv="1.0.2"
timely = {workspace = true}

Expand Down
2 changes: 0 additions & 2 deletions dogsdogsdogs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ license = "MIT"
edition = "2021"

[dependencies]
abomonation = "0.7"
abomonation_derive = "0.5"
timely = { workspace = true }
differential-dataflow = { path = "../", default-features = false }
serde = "1"
Expand Down
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/altneu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
//! element of the second lattice, if neither first element equals
//! the join.
use abomonation_derive::Abomonation;
use serde_derive::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct AltNeu<T> {
pub time: T,
pub neu: bool, // alt < neu in timestamp comparisons.
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ where
stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {

// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);
let activator = stream.scope().activator_for(info.address);

move |input1, input2, output| {

Expand Down
2 changes: 1 addition & 1 deletion examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn main() {
// create a source operator which will produce random edges and delete them.
timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| {

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
Expand Down
3 changes: 1 addition & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use abomonation_derive::Abomonation;
use rand::{Rng, SeedableRng, StdRng};
use serde::{Deserialize, Serialize};

Expand All @@ -13,7 +12,7 @@ use differential_dataflow::lattice::Lattice;
type Node = u32;
type Edge = (Node, Node);

#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
pub struct MinSum {
value: u32,
}
Expand Down
6 changes: 2 additions & 4 deletions examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn main() {
mod pair {

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
Expand Down Expand Up @@ -203,7 +203,6 @@ mod pair {
}

use std::fmt::{Formatter, Error, Debug};
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Debug implementation to avoid seeing fully qualified path names.
Expand All @@ -221,11 +220,10 @@ mod pair {
/// from the rest of the library other than the traits it needs to implement. With this
/// type and its implementations, you can use it as a timestamp type.
mod vector {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct Vector<T> {
pub vector: Vec<T>,
}
Expand Down
2 changes: 0 additions & 2 deletions experiments/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ edition = "2021"
[dependencies]
core_affinity = "0.5.9"
rand="0.3.13"
abomonation = "0.7"
abomonation_derive = "0.5"
#timely = "0.7"
timely = { workspace = true }
differential-dataflow = { path = "../" }
Expand Down
6 changes: 2 additions & 4 deletions experiments/src/bin/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ fn main() {
mod pair {

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
Expand Down Expand Up @@ -231,7 +231,6 @@ mod pair {
}

use std::fmt::{Formatter, Error, Debug};
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Debug implementation to avoid seeing fully qualified path names.
Expand All @@ -249,11 +248,10 @@ mod pair {
/// from the rest of the library other than the traits it needs to implement. With this
/// type and its implementations, you can use it as a timestamp type.
mod vector {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct Vector<T> {
pub vector: Vec<T>,
}
Expand Down
2 changes: 1 addition & 1 deletion server/dataflows/random_graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
let mut trace =
source(dataflow, "RandomGraph", |cap, info| {

let activator = dataflow.activator_for(&info.address[..]);
let activator = dataflow.activator_for(info.address);
let mut hist = hdrhist::HDRHist::new();

let probe2 = probe.clone();
Expand Down
19 changes: 9 additions & 10 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
//! this file.
use std::time::Duration;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A message in the CDC V2 protocol.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum Message<D, T, R> {
/// A batch of updates that are certain to occur.
///
Expand All @@ -32,7 +31,7 @@ pub enum Message<D, T, R> {
/// Each element of `counts` is an irrevocable statement about the exact number of
/// distinct updates that occur at that time.
/// Times not present in `counts` have a count of zero.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Progress<T> {
/// The lower bound of times contained in this statement.
pub lower: Vec<T>,
Expand Down Expand Up @@ -310,9 +309,9 @@ pub mod source {
// Step 1: The MESSAGES operator.
let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
let activator = scope.sync_activator_for(address.to_vec());
let activator2 = scope.activator_for(Rc::clone(&address));
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
Expand Down Expand Up @@ -582,13 +581,13 @@ pub mod sink {
// We can simply record all updates, under the presumption that the have been consolidated
// and so any record we see is in fact guaranteed to happen.
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();

builder.build_reschedule(
move |_capability| {
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
let mut send_queue = std::collections::VecDeque::new();
move |_frontiers| {
let mut output = updates_out.activate();
Expand Down Expand Up @@ -636,15 +635,15 @@ pub mod sink {

// We use a lower-level builder here to get access to the operator address, for rescheduling.
let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();

// We now record the numbers of updates at each timestamp between lower and upper bounds.
// Track the advancing frontier, to know when to produce utterances.
let mut frontier = Antichain::from_elem(T::minimum());
// Track accumulated counts for timestamps.
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
// Stash for serialized data yet to send.
let mut send_queue = std::collections::VecDeque::new();
let mut retain = Vec::new();
Expand Down
3 changes: 1 addition & 2 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ wrapping_implementation!(std::num::Wrapping<isize>);

pub use self::present::Present;
mod present {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A zero-sized difference that indicates the presence of a record.
Expand All @@ -168,7 +167,7 @@ mod present {
/// The primary feature of this type is that it has zero size, which reduces the overhead
/// of differential dataflow's representations for settings where collections either do
/// not change, or for which records are only added (for example, derived facts in Datalog).
#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
pub struct Present;

impl<T: Clone> super::Multiply<T> for Present {
Expand Down
9 changes: 2 additions & 7 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
//! default coordinates (which is effectively just *setting* the coordinate).
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A sequence of timestamps, partially ordered by the product order.
///
/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStamp<T> {
/// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
vector: Vec<T>,
Expand Down Expand Up @@ -118,9 +115,7 @@ impl<T: Timestamp> Refines<()> for PointStamp<T> {
use timely::progress::PathSummary;

/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStampSummary<TS> {
/// Number of leading coordinates to retain.
///
Expand Down
16 changes: 8 additions & 8 deletions src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Loggers and logging events for differential dataflow.
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Logger for differential dataflow events.
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
Expand All @@ -19,7 +19,7 @@ where
}

/// Possible different differential events.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub enum DifferentialEvent {
/// Batch creation.
Batch(BatchEvent),
Expand All @@ -36,7 +36,7 @@ pub enum DifferentialEvent {
}

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatchEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -48,7 +48,7 @@ impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { D


/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -65,7 +65,7 @@ pub struct BatcherEvent {
impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct DropEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -76,7 +76,7 @@ pub struct DropEvent {
impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -93,7 +93,7 @@ pub struct MergeEvent {
impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }

/// A merge failed to complete in time.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeShortfall {
/// Operator identifer.
pub operator: usize,
Expand All @@ -106,7 +106,7 @@ pub struct MergeShortfall {
impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct TraceShare {
/// Operator identifier.
pub operator: usize,
Expand Down
8 changes: 4 additions & 4 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down Expand Up @@ -439,10 +439,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let activator = Some(scope.activator_for(&info.address[..]));
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(&info.address[..]));
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ where
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = arranged1.stream.scope().activations().clone();
let activator = Activator::new(&info.address[..], activations);
let activator = Activator::new(info.address, activations);

// Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
// These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
Expand Down
2 changes: 1 addition & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ where
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..]));
let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
Loading

0 comments on commit 9376560

Please sign in to comment.