diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 886bcb590..e7e857a00 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -517,29 +517,62 @@ where Tr::Batch: Batch, Tr::Cursor: Cursor, { - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; - - // fabricate a data-parallel operator using the `unary_notify` pattern. + self.arrange_general::(pact, name, |_capability, _trace_agent| { + |batch, _capability| (batch, Vec::new()) + }) + } +} + + +impl Collection +where + G: Scope, + G::Timestamp: Lattice+Ord, + K: ExchangeData+Hashable, + V: ExchangeData, + R: Semigroup+ExchangeData, +{ + /// Arranges a differential dataflow collection with custom user logic. + /// + /// This method generalizes `arrange` in that the output type may differ + /// from the input type, and the user is allow to perform logic as the + /// input batch is formed before it is accepted in to the output trace. + /// This allows users to perform non-standard update logic, perhaps like + /// a state machine more than the simple insertion that `arrange` applies. + /// + /// The standard `arrange` operator provides a `logic_builder` that ignores + /// the supplied capability and trace handle, and produces a `logic` closure + /// that on each call just returns the supplied batch. + /// + /// More generally, the harness operator will repeatedly peel out capabilities + /// that are not beyond the frontier, mint the batch of input updates that + /// correspond to that interval, and present them to the created closure. + /// The closure is expected to return an output batch (for immediate release) + /// and a list of capabilities the harness should re-introduce, to ensure + /// that the logic is given the opportunity to produce output at those times + /// in the future. These capabilities are both necessary to send outputs, + /// but also act as a scheduling mechanism to ensure that the logic is asked + /// when those times arrive, instead of the harness quietly minting an empty + /// output batch. + pub fn arrange_general(&self, pact: P, name: &str, logic_builder: F) -> Arranged> + where + P: ParallelizationContract, + TrIn: Trace+TraceReader+'static, + TrIn::Batch: Batch, + TrIn::Cursor: Cursor, + TrOut: Trace+TraceReader+'static, + TrOut::Batch: Batch, + TrOut::Cursor: Cursor, + F: FnOnce(Capability, TraceAgent) -> L, + L: FnMut(TrIn::Batch, &Capability)->(TrOut::Batch, Vec>)+'static, + { + // To be populated by the closure once it gets operator information. + let mut reader: Option> = None; + let stream = { let reader = &mut reader; - - self.inner.unary_frontier(pact, name, move |_capability, info| { + self.inner.unary_frontier(pact, name, move |capability, info| { // Acquire a logger for arrange events. let logger = { @@ -549,14 +582,13 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = >::Batcher::new(); + let mut batcher = >::Batcher::new(); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - + // Buffer for receiving input records. let mut buffer = Vec::new(); - let (activator, effort) = if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { (Some(self.scope().activator_for(&info.address[..])), Some(effort)) @@ -565,8 +597,10 @@ where (None, None) }; - let empty_trace = Tr::new(info.clone(), logger.clone(), activator); + let empty_trace = TrOut::new(info.clone(), logger.clone(), activator); let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + // Capture a reference to the output trace, to use as appropriate. + let mut logic = logic_builder(capability, reader_local.clone()); *reader = Some(reader_local); @@ -597,6 +631,8 @@ where // to the new frontier). let progress = input_frontier.iter().any(|t2| !input.frontier().less_equal(t2)); + let mut new_capabilities = Vec::new(); + if progress { // There are two cases to handle with some care: @@ -633,15 +669,23 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal(upper.clone()); - - writer.insert(batch.clone(), Some(capability.time().clone())); + let input_batch = batcher.seal(upper.clone()); + let (output_batch, new_caps) = logic(input_batch, &capability); + writer.insert(output_batch.clone(), Some(capability.time().clone())); // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); + output.session(&capability).give(output_batch); + + // Ideally all new capabilities are in the future of the supplied capabity, + // and not in the interval we just closed out. + assert!(new_caps.iter().all(|c| upper.less_equal(c) && capabilities.elements()[index].less_equal(c))); + + new_capabilities.extend(new_caps); } } + capabilities.extend(new_capabilities.drain(..)); + // Having extracted and sent batches between each capability and the input frontier, // we should downgrade all capabilities to match the batcher's lower update frontier. // This may involve discarding capabilities, which is fine as any new updates arrive