Skip to content

Further tidying up of submitted PRs #367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 30, 2022
Merged
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::TraceReader;

/// Reports a number of extensions to a stream of prefixes.
///
Expand All @@ -23,8 +23,6 @@ where
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Batch: BatchReader<Tr::Key, (), Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, (), Tr::Time, Tr::R>,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
P: ExchangeData,
Expand Down
6 changes: 1 addition & 5 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};

/// A binary equijoin that responds to updates on only its first input.
Expand Down Expand Up @@ -81,8 +81,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
Expand Down Expand Up @@ -137,8 +135,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};

/// Proposes extensions to a stream of prefixes.
///
Expand All @@ -32,8 +32,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
D: ExchangeData,
Expand Down
6 changes: 1 addition & 5 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::TraceReader;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -25,8 +25,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
P: ExchangeData,
Expand Down Expand Up @@ -58,8 +56,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
P: ExchangeData,
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::TraceReader;

/// Proposes extensions to a stream of prefixes.
///
Expand All @@ -24,8 +24,6 @@ where
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
Expand Down
2 changes: 1 addition & 1 deletion examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use timely::progress::frontier::AntichainRef;
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::*;
use differential_dataflow::trace::cursor::CursorDebug;
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::TraceReader;

type Node = u32;
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ where
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ where
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
{
forward
.stream
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ where
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
Expand All @@ -92,7 +92,7 @@ where
pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter<Tr>)
where
Tr: Trace,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
let queues = Rc::new(RefCell::new(Vec::new()));
Expand Down
25 changes: 7 additions & 18 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ impl<G: Scope, Tr> Clone for Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
fn clone(&self) -> Self {
Arranged {
Expand All @@ -83,8 +81,6 @@ impl<G: Scope, Tr> Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
/// Brings an arranged collection into a nested scope.
///
Expand Down Expand Up @@ -405,7 +401,7 @@ where

// Determine new frontier on queries that may be issued.
// TODO: This code looks very suspect; explain better or fix.
let frontier = std::array::IntoIter::new([
let frontier = IntoIterator::into_iter([
capability.as_ref().map(|c| c.time().clone()),
input1.frontier().frontier().get(0).cloned(),
]).filter_map(|t| t).min();
Expand All @@ -425,8 +421,6 @@ impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
/// Brings an arranged collection out of a nested region.
///
Expand Down Expand Up @@ -462,8 +456,7 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
{
self.arrange_named("Arrange")
}
Expand All @@ -479,8 +472,7 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -495,8 +487,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
;
}

Expand All @@ -512,8 +503,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -547,7 +537,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = <Tr::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
let mut batcher = <Tr::Batch as Batch>::Batcher::new();

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down Expand Up @@ -684,8 +674,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr::Batch: Batch<K, (), G::Timestamp, R>,
Tr::Cursor: Cursor<K, (), G::Timestamp, R>,
Tr::Batch: Batch,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down
5 changes: 2 additions & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ where
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch<Tr::Key, Tr::Val, G::Timestamp, isize>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, isize>,
Tr::Batch: Batch,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -252,7 +251,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,G::Timestamp,Tr::R>>::Builder::new();
let mut builder = <Tr::Batch as Batch>::Builder::new();
for (key, mut list) in to_process.drain(..) {

// The prior value associated with the key.
Expand Down
8 changes: 4 additions & 4 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
/// Current upper limit.
upper: Antichain<Tr::Time>,
Expand All @@ -37,7 +37,7 @@ impl<Tr> TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
/// Creates a new `TraceWriter`.
pub fn new(
Expand Down Expand Up @@ -96,7 +96,7 @@ where
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use trace::Builder;
let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
let builder = <Tr::Batch as Batch>::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
}
Expand All @@ -107,7 +107,7 @@ impl<Tr> Drop for TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
fn drop(&mut self) {
self.seal(Antichain::new())
Expand Down
2 changes: 0 additions & 2 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ where
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
T1::Key: ExchangeData,
T1::R: ExchangeData+Semigroup,
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {

Expand Down
Loading