Skip to content

Commit 1ea4170

Browse files
committed
trace: define BatchReader with associated types
The `TraceReader` trait uses associated types to define its `Key`, `Val`, `Time`, `Diff` but the `BatchReader` trait did not, even though they are very similar in nature. Usually the choice between asssociated types or generic parameters on a trait is determined by whether or not a particular type is expected to implement the same trait multiple times. My starting point was that these two trait should at the very least be consistent with respect to their structure and either both use generic parameters or both use associated types. All the uses in this repo (and also that I can imagine being useful) don't really need `BatchReader` to be polymorphic for a particular type and so I chose to change that one to make it consistent with `TraceReader`. The result is quite pleasing as in many cases a lot of generic parameters are erased. In order to keep this PR short I left the `Cursor` trait untouched, but I believe a similar transformation would be beneficial there too, simplifying further many type signatures. Signed-off-by: Petros Angelatos <[email protected]>
1 parent f862b65 commit 1ea4170

21 files changed

+214
-250
lines changed

src/algorithms/graphs/bfs.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ where
3030
G::Timestamp: Lattice+Ord,
3131
N: ExchangeData+Hash,
3232
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
33-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
3433
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
3534
{
3635
// initialize roots as reaching themselves at distance 0
@@ -46,4 +45,4 @@ where
4645
.concat(&nodes)
4746
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
4847
})
49-
}
48+
}

src/algorithms/graphs/bijkstra.rs

-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ where
4646
G::Timestamp: Lattice+Ord,
4747
N: ExchangeData+Hash,
4848
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
49-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
5049
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
5150
{
5251
forward

src/algorithms/graphs/propagate.rs

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ where
6565
R: From<i8>,
6666
L: ExchangeData,
6767
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
68-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
6968
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
7069
F: Fn(&L)->u64+Clone+'static,
7170
{

src/operators/arrange/agent.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ where
9292
pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter<Tr>)
9393
where
9494
Tr: Trace,
95-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
95+
Tr::Batch: Batch,
9696
{
9797
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
9898
let queues = Rc::new(RefCell::new(Vec::new()));

src/operators/arrange/arrangement.rs

+6-9
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ impl<G: Scope, Tr> Clone for Arranged<G, Tr>
6565
where
6666
G::Timestamp: Lattice+Ord,
6767
Tr: TraceReader<Time=G::Timestamp> + Clone,
68-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
6968
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
7069
{
7170
fn clone(&self) -> Self {
@@ -83,7 +82,6 @@ impl<G: Scope, Tr> Arranged<G, Tr>
8382
where
8483
G::Timestamp: Lattice+Ord,
8584
Tr: TraceReader<Time=G::Timestamp> + Clone,
86-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
8785
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
8886
{
8987
/// Brings an arranged collection into a nested scope.
@@ -425,7 +423,6 @@ impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
425423
where
426424
G::Timestamp: Lattice+Ord,
427425
Tr: TraceReader<Time=G::Timestamp> + Clone,
428-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
429426
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
430427
{
431428
/// Brings an arranged collection out of a nested region.
@@ -462,7 +459,7 @@ where
462459
V: ExchangeData,
463460
R: ExchangeData,
464461
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
465-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
462+
Tr::Batch: Batch,
466463
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
467464
{
468465
self.arrange_named("Arrange")
@@ -479,7 +476,7 @@ where
479476
V: ExchangeData,
480477
R: ExchangeData,
481478
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
482-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
479+
Tr::Batch: Batch,
483480
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
484481
{
485482
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
@@ -495,7 +492,7 @@ where
495492
where
496493
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
497494
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
498-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
495+
Tr::Batch: Batch,
499496
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
500497
;
501498
}
@@ -512,7 +509,7 @@ where
512509
where
513510
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
514511
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
515-
Tr::Batch: Batch<K, V, G::Timestamp, R>,
512+
Tr::Batch: Batch,
516513
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
517514
{
518515
// The `Arrange` operator is tasked with reacting to an advancing input
@@ -547,7 +544,7 @@ where
547544
};
548545

549546
// Where we will deposit received updates, and from which we extract batches.
550-
let mut batcher = <Tr::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
547+
let mut batcher = <Tr::Batch as Batch>::Batcher::new();
551548

552549
// Capabilities for the lower envelope of updates in `batcher`.
553550
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
@@ -684,7 +681,7 @@ where
684681
where
685682
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
686683
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
687-
Tr::Batch: Batch<K, (), G::Timestamp, R>,
684+
Tr::Batch: Batch,
688685
Tr::Cursor: Cursor<K, (), G::Timestamp, R>,
689686
{
690687
self.map(|k| (k, ()))

src/operators/arrange/upsert.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ where
145145
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
146146
Tr::Val: ExchangeData,
147147
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
148-
Tr::Batch: Batch<Tr::Key, Tr::Val, G::Timestamp, isize>,
148+
Tr::Batch: Batch,
149149
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, isize>,
150150
{
151151
let mut reader: Option<TraceAgent<Tr>> = None;
@@ -252,7 +252,7 @@ where
252252
// Prepare a cursor to the existing arrangement, and a batch builder for
253253
// new stuff that we add.
254254
let (mut trace_cursor, trace_storage) = reader_local.cursor();
255-
let mut builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,G::Timestamp,Tr::R>>::Builder::new();
255+
let mut builder = <Tr::Batch as Batch>::Builder::new();
256256
for (key, mut list) in to_process.drain(..) {
257257

258258
// The prior value associated with the key.

src/operators/arrange/writer.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct TraceWriter<Tr>
2323
where
2424
Tr: Trace,
2525
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
26-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
26+
Tr::Batch: Batch,
2727
{
2828
/// Current upper limit.
2929
upper: Antichain<Tr::Time>,
@@ -37,7 +37,7 @@ impl<Tr> TraceWriter<Tr>
3737
where
3838
Tr: Trace,
3939
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
40-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
40+
Tr::Batch: Batch,
4141
{
4242
/// Creates a new `TraceWriter`.
4343
pub fn new(
@@ -96,7 +96,7 @@ where
9696
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
9797
if self.upper != upper {
9898
use trace::Builder;
99-
let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
99+
let builder = <Tr::Batch as Batch>::Builder::new();
100100
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
101101
self.insert(batch, None);
102102
}
@@ -107,7 +107,7 @@ impl<Tr> Drop for TraceWriter<Tr>
107107
where
108108
Tr: Trace,
109109
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
110-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
110+
Tr::Batch: Batch,
111111
{
112112
fn drop(&mut self) {
113113
self.seal(Antichain::new())

src/operators/count.rs

-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ where
7474
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
7575
T1::Key: ExchangeData,
7676
T1::R: ExchangeData+Semigroup,
77-
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
7877
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
7978
{
8079
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {

src/operators/join.rs

-8
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ where
190190
Tr::Key: Data+Hashable,
191191
Tr::Val: Data,
192192
Tr::R: Semigroup,
193-
Tr::Batch: BatchReader<Tr::Key,Tr::Val,G::Timestamp,Tr::R>+'static,
194193
Tr::Cursor: Cursor<Tr::Key,Tr::Val,G::Timestamp,Tr::R>+'static,
195194
{
196195
fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (Tr::Key, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::R as Multiply<R2>>::Output>
@@ -258,7 +257,6 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
258257
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
259258
where
260259
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
261-
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
262260
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
263261
Tr2::Val: Ord+Clone+Debug+'static,
264262
Tr2::R: Semigroup,
@@ -311,7 +309,6 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
311309
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
312310
where
313311
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
314-
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
315312
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
316313
Tr2::Val: Ord+Clone+Debug+'static,
317314
Tr2::R: Semigroup,
@@ -334,7 +331,6 @@ where
334331
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
335332
where
336333
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
337-
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
338334
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
339335
Tr2::Val: Ord+Clone+Debug+'static,
340336
Tr2::R: Semigroup,
@@ -351,7 +347,6 @@ where
351347
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
352348
where
353349
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
354-
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
355350
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
356351
Tr2::Val: Ord+Clone+Debug+'static,
357352
Tr2::R: Semigroup,
@@ -374,14 +369,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
374369
T1::Key: Ord+Debug+'static,
375370
T1::Val: Ord+Clone+Debug+'static,
376371
T1::R: Semigroup,
377-
T1::Batch: BatchReader<T1::Key,T1::Val,G::Timestamp,T1::R>+'static,
378372
T1::Cursor: Cursor<T1::Key,T1::Val,G::Timestamp,T1::R>+'static,
379373
{
380374
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
381375
where
382376
Tr2::Val: Ord+Clone+Debug+'static,
383377
Tr2: TraceReader<Key=T1::Key,Time=G::Timestamp>+Clone+'static,
384-
Tr2::Batch: BatchReader<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
385378
Tr2::Cursor: Cursor<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
386379
Tr2::R: Semigroup,
387380
T1::R: Multiply<Tr2::R>,
@@ -401,7 +394,6 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
401394
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,D,ROut>
402395
where
403396
Tr2: TraceReader<Key=T1::Key, Time=G::Timestamp>+Clone+'static,
404-
Tr2::Batch: BatchReader<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
405397
Tr2::Cursor: Cursor<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
406398
Tr2::Val: Ord+Clone+Debug+'static,
407399
Tr2::R: Semigroup,

src/operators/reduce.rs

+5-9
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ impl<G: Scope, K: Data, V: Data, T1, R: Semigroup> Reduce<G, K, V, R> for Arrang
9090
where
9191
G::Timestamp: Lattice+Ord,
9292
T1: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R>+Clone+'static,
93-
T1::Batch: BatchReader<K, V, G::Timestamp, R>,
9493
T1::Cursor: Cursor<K, V, G::Timestamp, R>,
9594
{
9695
fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
@@ -179,7 +178,6 @@ impl<G: Scope, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T
179178
where
180179
G::Timestamp: Lattice+Ord,
181180
T1: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R1>+Clone+'static,
182-
T1::Batch: BatchReader<K, (), G::Timestamp, R1>,
183181
T1::Cursor: Cursor<K, (), G::Timestamp, R1>,
184182
{
185183
fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
@@ -236,7 +234,6 @@ impl<G: Scope, K: Data, T1, R: Semigroup> Count<G, K, R> for Arranged<G, T1>
236234
where
237235
G::Timestamp: Lattice+Ord,
238236
T1: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+Clone+'static,
239-
T1::Batch: BatchReader<K, (), G::Timestamp, R>,
240237
T1::Cursor: Cursor<K, (), G::Timestamp, R>,
241238
{
242239
fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
@@ -282,7 +279,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
282279
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
283280
T2::Val: Data,
284281
T2::R: Abelian,
285-
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
282+
T2::Batch: Batch,
286283
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
287284
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
288285
{
@@ -305,7 +302,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
305302
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
306303
T2::Val: Data,
307304
T2::R: Semigroup,
308-
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
305+
T2::Batch: Batch,
309306
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
310307
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
311308
;
@@ -324,7 +321,7 @@ where
324321
T2::Val: Data,
325322
T2::R: Semigroup,
326323
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
327-
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
324+
T2::Batch: Batch,
328325
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
329326
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
330327
{
@@ -337,15 +334,14 @@ impl<G: Scope, K: Data, V: Data, T1, R: Semigroup> ReduceCore<G, K, V, R> for Ar
337334
where
338335
G::Timestamp: Lattice+Ord,
339336
T1: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R>+Clone+'static,
340-
T1::Batch: BatchReader<K, V, G::Timestamp, R>,
341337
T1::Cursor: Cursor<K, V, G::Timestamp, R>,
342338
{
343339
fn reduce_core<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
344340
where
345341
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
346342
T2::Val: Data,
347343
T2::R: Semigroup,
348-
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
344+
T2::Batch: Batch,
349345
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
350346
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
351347

@@ -488,7 +484,7 @@ where
488484
let mut builders = Vec::new();
489485
for i in 0 .. capabilities.len() {
490486
buffers.push((capabilities[i].time().clone(), Vec::new()));
491-
builders.push(<T2::Batch as Batch<K,T2::Val,G::Timestamp,T2::R>>::Builder::new());
487+
builders.push(<T2::Batch as Batch>::Builder::new());
492488
}
493489

494490
// cursors for navigating input and output traces.

src/operators/threshold.rs

-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ where
108108
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
109109
T1::Key: ExchangeData,
110110
T1::R: ExchangeData+Semigroup,
111-
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
112111
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
113112
{
114113
fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, T1::Key, R2>

src/trace/implementations/merge_batcher.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use lattice::Lattice;
88
use trace::{Batch, Batcher, Builder};
99

1010
/// Creates batches from unordered tuples.
11-
pub struct MergeBatcher<K: Ord, V: Ord, T: Ord, R: Semigroup, B: Batch<K, V, T, R>> {
11+
pub struct MergeBatcher<K: Ord, V: Ord, T: Ord, R: Semigroup, B: Batch<Key=K, Val=V, Time=T, R=R>> {
1212
sorter: MergeSorter<(K, V), T, R>,
1313
lower: Antichain<T>,
1414
frontier: Antichain<T>,
@@ -21,7 +21,7 @@ where
2121
V: Ord+Clone,
2222
T: Lattice+timely::progress::Timestamp+Ord+Clone,
2323
R: Semigroup,
24-
B: Batch<K, V, T, R>,
24+
B: Batch<Key=K, Val=V, Time=T, R=R>,
2525
{
2626
fn new() -> Self {
2727
MergeBatcher {

0 commit comments

Comments
 (0)