Skip to content

Commit 8123b20

Browse files
Further tidying up of submitted PRs (#367)
* remove transmute and fix rust warning while `()` is a ZST the potentially dangling reference is still undefined behaviour. Making it a static is a trivial fix. Signed-off-by: Petros Angelatos <[email protected]> * trace: define `BatchReader` with associated types The `TraceReader` trait uses associated types to define its `Key`, `Val`, `Time`, `Diff` but the `BatchReader` trait did not, even though they are very similar in nature. Usually the choice between asssociated types or generic parameters on a trait is determined by whether or not a particular type is expected to implement the same trait multiple times. My starting point was that these two trait should at the very least be consistent with respect to their structure and either both use generic parameters or both use associated types. All the uses in this repo (and also that I can imagine being useful) don't really need `BatchReader` to be polymorphic for a particular type and so I chose to change that one to make it consistent with `TraceReader`. The result is quite pleasing as in many cases a lot of generic parameters are erased. In order to keep this PR short I left the `Cursor` trait untouched, but I believe a similar transformation would be beneficial there too, simplifying further many type signatures. Signed-off-by: Petros Angelatos <[email protected]> * trace: redefine Cursor with associated types Signed-off-by: Petros Angelatos <[email protected]> * remove transmute and fix rust warning while `()` is a ZST the potentially dangling reference is still undefined behaviour. Making it a static is a trivial fix. Signed-off-by: Petros Angelatos <[email protected]> * simplify generics of batch related traits * simplify generics of join's deferred struct * merge CursorDebug trait methods into Cursor * various code style changes Co-authored-by: Frank McSherry <[email protected]>
1 parent 337f847 commit 8123b20

32 files changed

+605
-600
lines changed

dogsdogsdogs/src/operators/count.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
44
use differential_dataflow::difference::{Monoid, Multiply};
55
use differential_dataflow::lattice::Lattice;
66
use differential_dataflow::operators::arrange::Arranged;
7-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
7+
use differential_dataflow::trace::TraceReader;
88

99
/// Reports a number of extensions to a stream of prefixes.
1010
///
@@ -23,8 +23,6 @@ where
2323
G::Timestamp: Lattice,
2424
Tr: TraceReader<Val=(), Time=G::Timestamp, R=isize>+Clone+'static,
2525
Tr::Key: Ord+Hashable+Default,
26-
Tr::Batch: BatchReader<Tr::Key, (), Tr::Time, Tr::R>,
27-
Tr::Cursor: Cursor<Tr::Key, (), Tr::Time, Tr::R>,
2826
R: Monoid+Multiply<Output = R>+ExchangeData,
2927
F: Fn(&P)->Tr::Key+Clone+'static,
3028
P: ExchangeData,

dogsdogsdogs/src/operators/half_join.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
4242
use differential_dataflow::difference::{Monoid, Semigroup};
4343
use differential_dataflow::lattice::Lattice;
4444
use differential_dataflow::operators::arrange::Arranged;
45-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
45+
use differential_dataflow::trace::{Cursor, TraceReader};
4646
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
4747

4848
/// A binary equijoin that responds to updates on only its first input.
@@ -81,8 +81,6 @@ where
8181
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
8282
Tr::Key: Ord+Hashable+ExchangeData,
8383
Tr::Val: Clone,
84-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
85-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
8684
Tr::R: Monoid+ExchangeData,
8785
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
8886
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
@@ -137,8 +135,6 @@ where
137135
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
138136
Tr::Key: Ord+Hashable+ExchangeData,
139137
Tr::Val: Clone,
140-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
141-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
142138
Tr::R: Monoid+ExchangeData,
143139
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
144140
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,

dogsdogsdogs/src/operators/lookup_map.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
1010
use differential_dataflow::difference::{Semigroup, Monoid};
1111
use differential_dataflow::lattice::Lattice;
1212
use differential_dataflow::operators::arrange::Arranged;
13-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
13+
use differential_dataflow::trace::{Cursor, TraceReader};
1414

1515
/// Proposes extensions to a stream of prefixes.
1616
///
@@ -32,8 +32,6 @@ where
3232
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
3333
Tr::Key: Ord+Hashable,
3434
Tr::Val: Clone,
35-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
36-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
3735
Tr::R: Monoid+ExchangeData,
3836
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
3937
D: ExchangeData,

dogsdogsdogs/src/operators/propose.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
44
use differential_dataflow::difference::{Monoid, Multiply};
55
use differential_dataflow::lattice::Lattice;
66
use differential_dataflow::operators::arrange::Arranged;
7-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
7+
use differential_dataflow::trace::TraceReader;
88

99
/// Proposes extensions to a prefix stream.
1010
///
@@ -25,8 +25,6 @@ where
2525
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
2626
Tr::Key: Ord+Hashable+Default,
2727
Tr::Val: Clone,
28-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
29-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
3028
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
3129
F: Fn(&P)->Tr::Key+Clone+'static,
3230
P: ExchangeData,
@@ -58,8 +56,6 @@ where
5856
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
5957
Tr::Key: Ord+Hashable+Default,
6058
Tr::Val: Clone,
61-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
62-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
6359
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
6460
F: Fn(&P)->Tr::Key+Clone+'static,
6561
P: ExchangeData,

dogsdogsdogs/src/operators/validate.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection};
66
use differential_dataflow::difference::{Monoid, Multiply};
77
use differential_dataflow::lattice::Lattice;
88
use differential_dataflow::operators::arrange::Arranged;
9-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
9+
use differential_dataflow::trace::TraceReader;
1010

1111
/// Proposes extensions to a stream of prefixes.
1212
///
@@ -24,8 +24,6 @@ where
2424
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
2525
K: Ord+Hash+Clone+Default,
2626
V: ExchangeData+Hash+Default,
27-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
28-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
2927
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
3028
F: Fn(&P)->K+Clone+'static,
3129
P: ExchangeData,

examples/cursors.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use timely::progress::frontier::AntichainRef;
4343
use differential_dataflow::input::Input;
4444
use differential_dataflow::operators::arrange::ArrangeByKey;
4545
use differential_dataflow::operators::*;
46-
use differential_dataflow::trace::cursor::CursorDebug;
46+
use differential_dataflow::trace::cursor::Cursor;
4747
use differential_dataflow::trace::TraceReader;
4848

4949
type Node = u32;

src/algorithms/graphs/bfs.rs

-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ where
3030
G::Timestamp: Lattice+Ord,
3131
N: ExchangeData+Hash,
3232
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
33-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
34-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
3533
{
3634
// initialize roots as reaching themselves at distance 0
3735
let nodes = roots.map(|x| (x, 0));

src/algorithms/graphs/bijkstra.rs

-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ where
4646
G::Timestamp: Lattice+Ord,
4747
N: ExchangeData+Hash,
4848
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
49-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
50-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
5149
{
5250
forward
5351
.stream

src/algorithms/graphs/propagate.rs

-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ where
6565
R: From<i8>,
6666
L: ExchangeData,
6767
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
68-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
69-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
7068
F: Fn(&L)->u64+Clone+'static,
7169
{
7270
// Morally the code performs the following iterative computation. However, in the interest of a simplified

src/operators/arrange/agent.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ where
7777
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
7878
self.physical_compaction.borrow()
7979
}
80-
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
80+
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
8181
self.trace.borrow_mut().trace.cursor_through(frontier)
8282
}
8383
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
@@ -92,7 +92,7 @@ where
9292
pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter<Tr>)
9393
where
9494
Tr: Trace,
95-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
95+
Tr::Batch: Batch,
9696
{
9797
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
9898
let queues = Rc::new(RefCell::new(Vec::new()));

src/operators/arrange/arrangement.rs

+7-18
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ impl<G: Scope, Tr> Clone for Arranged<G, Tr>
6565
where
6666
G::Timestamp: Lattice+Ord,
6767
Tr: TraceReader<Time=G::Timestamp> + Clone,
68-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
69-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
7068
{
7169
fn clone(&self) -> Self {
7270
Arranged {
@@ -83,8 +81,6 @@ impl<G: Scope, Tr> Arranged<G, Tr>
8381
where
8482
G::Timestamp: Lattice+Ord,
8583
Tr: TraceReader<Time=G::Timestamp> + Clone,
86-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
87-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
8884
{
8985
/// Brings an arranged collection into a nested scope.
9086
///
@@ -405,7 +401,7 @@ where
405401

406402
// Determine new frontier on queries that may be issued.
407403
// TODO: This code looks very suspect; explain better or fix.
408-
let frontier = std::array::IntoIter::new([
404+
let frontier = IntoIterator::into_iter([
409405
capability.as_ref().map(|c| c.time().clone()),
410406
input1.frontier().frontier().get(0).cloned(),
411407
]).filter_map(|t| t).min();
@@ -425,8 +421,6 @@ impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
425421
where
426422
G::Timestamp: Lattice+Ord,
427423
Tr: TraceReader<Time=G::Timestamp> + Clone,
428-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
429-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
430424
{
431425
/// Brings an arranged collection out of a nested region.
432426
///
@@ -462,8 +456,7 @@ where
462456
V: ExchangeData,
463457
R: ExchangeData,
464458
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
465-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
466-
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
459+
Tr::Batch: Batch,
467460
{
468461
self.arrange_named("Arrange")
469462
}
@@ -479,8 +472,7 @@ where
479472
V: ExchangeData,
480473
R: ExchangeData,
481474
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
482-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
483-
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
475+
Tr::Batch: Batch,
484476
{
485477
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
486478
self.arrange_core(exchange, name)
@@ -495,8 +487,7 @@ where
495487
where
496488
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
497489
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
498-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
499-
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
490+
Tr::Batch: Batch,
500491
;
501492
}
502493

@@ -512,8 +503,7 @@ where
512503
where
513504
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
514505
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
515-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
516-
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
506+
Tr::Batch: Batch,
517507
{
518508
// The `Arrange` operator is tasked with reacting to an advancing input
519509
// frontier by producing the sequence of batches whose lower and upper
@@ -547,7 +537,7 @@ where
547537
};
548538

549539
// Where we will deposit received updates, and from which we extract batches.
550-
let mut batcher = <Tr::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
540+
let mut batcher = <Tr::Batch as Batch>::Batcher::new();
551541

552542
// Capabilities for the lower envelope of updates in `batcher`.
553543
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
@@ -684,8 +674,7 @@ where
684674
where
685675
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
686676
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
687-
Tr::Batch: Batch<K, (), G::Timestamp, R>,
688-
Tr::Cursor: Cursor<K, (), G::Timestamp, R>,
677+
Tr::Batch: Batch,
689678
{
690679
self.map(|k| (k, ()))
691680
.arrange_core(pact, name)

src/operators/arrange/upsert.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ where
145145
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
146146
Tr::Val: ExchangeData,
147147
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
148-
Tr::Batch: Batch<Tr::Key, Tr::Val, G::Timestamp, isize>,
149-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, isize>,
148+
Tr::Batch: Batch,
150149
{
151150
let mut reader: Option<TraceAgent<Tr>> = None;
152151

@@ -252,7 +251,7 @@ where
252251
// Prepare a cursor to the existing arrangement, and a batch builder for
253252
// new stuff that we add.
254253
let (mut trace_cursor, trace_storage) = reader_local.cursor();
255-
let mut builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,G::Timestamp,Tr::R>>::Builder::new();
254+
let mut builder = <Tr::Batch as Batch>::Builder::new();
256255
for (key, mut list) in to_process.drain(..) {
257256

258257
// The prior value associated with the key.

src/operators/arrange/writer.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct TraceWriter<Tr>
2323
where
2424
Tr: Trace,
2525
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
26-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
26+
Tr::Batch: Batch,
2727
{
2828
/// Current upper limit.
2929
upper: Antichain<Tr::Time>,
@@ -37,7 +37,7 @@ impl<Tr> TraceWriter<Tr>
3737
where
3838
Tr: Trace,
3939
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
40-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
40+
Tr::Batch: Batch,
4141
{
4242
/// Creates a new `TraceWriter`.
4343
pub fn new(
@@ -96,7 +96,7 @@ where
9696
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
9797
if self.upper != upper {
9898
use trace::Builder;
99-
let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
99+
let builder = <Tr::Batch as Batch>::Builder::new();
100100
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
101101
self.insert(batch, None);
102102
}
@@ -107,7 +107,7 @@ impl<Tr> Drop for TraceWriter<Tr>
107107
where
108108
Tr: Trace,
109109
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
110-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
110+
Tr::Batch: Batch,
111111
{
112112
fn drop(&mut self) {
113113
self.seal(Antichain::new())

src/operators/count.rs

-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ where
7474
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
7575
T1::Key: ExchangeData,
7676
T1::R: ExchangeData+Semigroup,
77-
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
78-
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
7977
{
8078
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {
8179

0 commit comments

Comments
 (0)