diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java index ad000a659a..658825bdc0 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java @@ -24,11 +24,12 @@ import com.apple.foundationdb.async.AsyncUtil; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.RecordCursorResult; -import com.apple.foundationdb.record.RecordCursorStartContinuation; import com.apple.foundationdb.record.RecordCursorVisitor; import com.apple.foundationdb.record.query.plan.plans.QueryResult; import com.google.common.base.Verify; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import javax.annotation.Nonnull; @@ -54,16 +55,25 @@ public class AggregateCursor implements RecordCursor previousResult; + @Nullable + // when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null + private RecordCursorResult lastResult; // Previous non-empty record processed by this cursor @Nullable private RecordCursorResult previousValidResult; + @Nullable + private RecordCursorProto.PartialAggregationResult partialAggregationResult; + @Nullable + byte[] continuation; public AggregateCursor(@Nonnull RecordCursor inner, @Nonnull final StreamGrouping streamGrouping, - boolean isCreateDefaultOnEmpty) { + boolean isCreateDefaultOnEmpty, + @Nullable byte[] continuation) { this.inner = inner; this.streamGrouping = streamGrouping; this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; + this.continuation = continuation; } @Nonnull @@ -71,14 +81,17 @@ public AggregateCursor(@Nonnull RecordCursor inner, public CompletableFuture> onNext() { if (previousResult != null && !previousResult.hasNext()) { // we are done - return CompletableFuture.completedFuture(RecordCursorResult.exhausted()); + return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(new AggregateCursorContinuation(previousResult.getContinuation()), + previousResult.getNoNextReason())); } return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> { + lastResult = previousResult; previousResult = innerResult; if (!innerResult.hasNext()) { if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) { - streamGrouping.finalizeGroup(); + // the method streamGrouping.finalizeGroup() computes previousCompleteResult and resets the accumulator + partialAggregationResult = streamGrouping.finalizeGroup(); } return false; } else { @@ -91,45 +104,60 @@ public CompletableFuture> onNext() { return (!groupBreak); } }), getExecutor()).thenApply(vignore -> { - if (isNoRecords()) { - // Edge case where there are no records at all - if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) { - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START); + // either innerResult.hasNext() = false; or groupBreak = true + if (Verify.verifyNotNull(previousResult).hasNext()) { + // in this case groupBreak = true, return aggregated result and continuation, partialAggregationResult = null + // previousValidResult = null happens when 1st row of current scan != last row of last scan, results in groupBreak = true and previousValidResult = null + RecordCursorContinuation c = previousValidResult == null ? new AggregateCursorContinuation(continuation, false) : new AggregateCursorContinuation(previousValidResult.getContinuation()); + + /* + * Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups. + * Below is an example that shows how continuation(previousValidResult) moves: + * Initial: previousResult = null, previousValidResult = null + row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0 + row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1 + row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2 + * returns result (groupKey0, continuation = row1), and set previousValidResult = row2 + * + * Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops + * In scenario 1, the iteration continues, it gets to row3: + row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 + * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 + * + * In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null: + row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2 + * (Note that because a new iteration starts, groupBreak = False for row2.) + row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 + * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 + * + * Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts. + */ + previousValidResult = previousResult; + return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), c); + } else { + // innerResult.hasNext() = false, might stop in the middle of a group + if (Verify.verifyNotNull(previousResult).getNoNextReason() == NoNextReason.SOURCE_EXHAUSTED) { + // exhausted + if (previousValidResult == null && partialAggregationResult == null) { + return RecordCursorResult.exhausted(); + } else { + RecordCursorContinuation c = previousValidResult == null ? new AggregateCursorContinuation(continuation, false) : new AggregateCursorContinuation(previousValidResult.getContinuation()); + previousValidResult = previousResult; + return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), c); + } } else { - return RecordCursorResult.exhausted(); + // stopped in the middle of a group + RecordCursorContinuation currentContinuation = new AggregateCursorContinuation(lastResult.getContinuation(), partialAggregationResult); + previousValidResult = previousResult; + return RecordCursorResult.withoutNextValue(currentContinuation, Verify.verifyNotNull(previousResult).getNoNextReason()); } } - // Use the last valid result for the continuation as we need non-terminal one here. - RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); - /* - * Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups. - * Below is an example that shows how continuation(previousValidResult) moves: - * Initial: previousResult = null, previousValidResult = null - row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0 - row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1 - row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2 - * returns result (groupKey0, continuation = row1), and set previousValidResult = row2 - * - * Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops - * In scenario 1, the iteration continues, it gets to row3: - row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 - * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 - * - * In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null: - row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2 - * (Note that because a new iteration starts, groupBreak = False for row2.) - row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 - * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 - * - * Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts. - */ - previousValidResult = previousResult; - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); }); } - + + private boolean isNoRecords() { - return ((previousValidResult == null) && (!Verify.verifyNotNull(previousResult).hasNext())); + return ((previousValidResult == null) && (!Verify.verifyNotNull(previousResult).hasNext()) && (streamGrouping.getPartialAggregationResult() == null)); } @Override @@ -155,4 +183,104 @@ public boolean accept(@Nonnull RecordCursorVisitor visitor) { } return visitor.visitLeave(this); } + + public static class AggregateCursorContinuation implements RecordCursorContinuation { + @Nullable + private final ByteString innerContinuation; + + @Nullable + private final RecordCursorProto.PartialAggregationResult partialAggregationResult; + + @Nullable + private RecordCursorProto.AggregateCursorContinuation cachedProto; + + private final boolean isEnd; + + public AggregateCursorContinuation(@Nonnull RecordCursorContinuation other) { + this(other.toBytes(), other.isEnd()); + } + + public AggregateCursorContinuation(@Nonnull RecordCursorContinuation other, @Nullable RecordCursorProto.PartialAggregationResult partialAggregationResult) { + this.isEnd = other.isEnd(); + this.innerContinuation = other.toByteString(); + this.partialAggregationResult = partialAggregationResult; + } + + public AggregateCursorContinuation(@Nullable byte[] innerContinuation, boolean isEnd, @Nullable RecordCursorProto.PartialAggregationResult partialAggregationResult) { + this.isEnd = isEnd; + this.innerContinuation = innerContinuation == null ? null : ByteString.copyFrom(innerContinuation); + this.partialAggregationResult = partialAggregationResult; + } + + public AggregateCursorContinuation(@Nullable byte[] innerContinuation, boolean isEnd) { + this(innerContinuation, isEnd, null); + } + + @Nonnull + @Override + public ByteString toByteString() { + if (isEnd()) { + return ByteString.EMPTY; + } else { + return toProto().toByteString(); + } + } + + @Nullable + @Override + public byte[] toBytes() { + if (isEnd()) { + return null; + } + return toProto().toByteArray(); + } + + @Override + public boolean isEnd() { + return isEnd; + } + + @Nullable + public byte[] getInnerContinuation() { + return innerContinuation == null ? null : innerContinuation.toByteArray(); + } + + @Nullable + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult() { + return partialAggregationResult; + } + + @Nonnull + private RecordCursorProto.AggregateCursorContinuation toProto() { + if (cachedProto == null) { + RecordCursorProto.AggregateCursorContinuation.Builder cachedProtoBuilder = RecordCursorProto.AggregateCursorContinuation.newBuilder(); + if (partialAggregationResult != null) { + cachedProtoBuilder.setPartialAggregationResults(partialAggregationResult); + } + if (innerContinuation != null) { + cachedProtoBuilder.setContinuation(innerContinuation); + } + cachedProto = cachedProtoBuilder.build(); + } + return cachedProto; + } + + public static AggregateCursorContinuation fromRawBytes(@Nullable byte[] rawBytes) { + if (rawBytes == null) { + return new AggregateCursorContinuation(null, true); + } + try { + RecordCursorProto.AggregateCursorContinuation continuationProto = RecordCursorProto.AggregateCursorContinuation.parseFrom(rawBytes); + if (!continuationProto.hasContinuation()) { + return new AggregateCursorContinuation(null, true); + } else if (continuationProto.hasPartialAggregationResults()) { + return new AggregateCursorContinuation(continuationProto.getContinuation().toByteArray(), false, continuationProto.getPartialAggregationResults()); + } else { + return new AggregateCursorContinuation(continuationProto.getContinuation().toByteArray(), false); + } + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + } + } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java index 721a920657..689c96a0e0 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java @@ -22,12 +22,15 @@ import com.apple.foundationdb.record.Bindings; import com.apple.foundationdb.record.EvaluationContext; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.RecordCursorResult; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase; import com.apple.foundationdb.record.query.plan.cascades.CorrelationIdentifier; import com.apple.foundationdb.record.query.plan.cascades.values.Accumulator; import com.apple.foundationdb.record.query.plan.cascades.values.AggregateValue; import com.apple.foundationdb.record.query.plan.cascades.values.Value; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import javax.annotation.Nonnull; @@ -104,10 +107,19 @@ public StreamGrouping(@Nullable final Value groupingKeyValue, @Nonnull final CorrelationIdentifier aggregateAlias, @Nonnull final FDBRecordStoreBase store, @Nonnull final EvaluationContext context, - @Nonnull final CorrelationIdentifier alias) { + @Nonnull final CorrelationIdentifier alias, + @Nullable final RecordCursorProto.PartialAggregationResult partialAggregationResult) { this.groupingKeyValue = groupingKeyValue; this.aggregateValue = aggregateValue; this.accumulator = aggregateValue.createAccumulator(context.getTypeRepository()); + if (partialAggregationResult != null) { + this.accumulator.setInitialState(partialAggregationResult.getAccumulatorStatesList()); + try { + this.currentGroup = DynamicMessage.parseFrom(context.getTypeRepository().newMessageBuilder(groupingKeyValue.getResultType()).getDescriptorForType(), partialAggregationResult.getGroupKey().toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } this.store = store; this.context = context; this.alias = alias; @@ -167,20 +179,22 @@ private boolean isGroupBreak(final Object currentGroup, final Object nextGroup) } } - public void finalizeGroup() { - finalizeGroup(null); + public RecordCursorProto.PartialAggregationResult finalizeGroup() { + return finalizeGroup(null); } - private void finalizeGroup(Object nextGroup) { + private RecordCursorProto.PartialAggregationResult finalizeGroup(Object nextGroup) { final EvaluationContext nestedContext = context.childBuilder() .setBinding(groupingKeyAlias, currentGroup) .setBinding(aggregateAlias, accumulator.finish()) .build(context.getTypeRepository()); previousCompleteResult = completeResultValue.eval(store, nestedContext); + RecordCursorProto.PartialAggregationResult result = currentGroup == null ? null : getPartialAggregationResult((Message) currentGroup); currentGroup = nextGroup; // "Reset" the accumulator by creating a fresh one. accumulator = aggregateValue.createAccumulator(context.getTypeRepository()); + return result; } private void accumulate(@Nullable Object currentObject) { @@ -197,4 +211,13 @@ private Object evalGroupingKey(@Nullable final Object currentObject) { public boolean isResultOnEmpty() { return groupingKeyValue == null; } + + @Nullable + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult(@Nonnull Message groupingKey) { + return accumulator.getPartialAggregationResult(groupingKey); + } + + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult() { + return accumulator.getPartialAggregationResult((Message)currentGroup); + } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java index ec5f7b9d17..57fce4f8ac 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java @@ -342,8 +342,8 @@ public T setHigh(@Nonnull byte[] highBytes, @Nonnull EndpointType highEndpoint) protected int calculatePrefixLength() { int prefixLength = subspace.pack().length; while ((prefixLength < lowBytes.length) && - (prefixLength < highBytes.length) && - (lowBytes[prefixLength] == highBytes[prefixLength])) { + (prefixLength < highBytes.length) && + (lowBytes[prefixLength] == highBytes[prefixLength])) { prefixLength++; } return prefixLength; diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/Accumulator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/Accumulator.java index e75e6d9e69..c9981e7f72 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/Accumulator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/Accumulator.java @@ -20,7 +20,12 @@ package com.apple.foundationdb.record.query.plan.cascades.values; +import com.apple.foundationdb.record.RecordCursorProto; +import com.google.protobuf.Message; + +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.List; /** * An aggregate accumulator. @@ -29,4 +34,9 @@ public interface Accumulator { void accumulate(@Nullable Object currentObject); @Nullable Object finish(); + + @Nullable + RecordCursorProto.PartialAggregationResult getPartialAggregationResult(Message groupingKey); + + void setInitialState(@Nonnull List accumulatorStates); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/CountValue.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/CountValue.java index 4adc95d892..0ed9e05418 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/CountValue.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/CountValue.java @@ -28,6 +28,7 @@ import com.apple.foundationdb.record.PlanHashable; import com.apple.foundationdb.record.PlanSerializationContext; import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.metadata.IndexTypes; import com.apple.foundationdb.record.planprotos.PCountValue; import com.apple.foundationdb.record.planprotos.PCountValue.PPhysicalOperator; @@ -345,6 +346,28 @@ public void accumulate(@Nullable final Object currentObject) { public Object finish() { return physicalOperator.evalPartialToFinal(state); } + + @Nullable + @Override + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult(Message groupingKey) { + if (state == null) { + return null; + } + return RecordCursorProto.PartialAggregationResult.newBuilder() + .setGroupKey(groupingKey.toByteString()) + .addAccumulatorStates(RecordCursorProto.AccumulatorState.newBuilder() + .setPhysicalOperatorName(physicalOperator.name()) + .addState(String.valueOf(state))) + .build(); + } + + @Override + public void setInitialState(@Nonnull List accumulatorStates) { + Verify.verify(this.state == null); + Verify.verify(accumulatorStates.size() == 1); + Verify.verify(physicalOperator.name().equals(accumulatorStates.get(0).getPhysicalOperatorName())); + this.state = Long.parseLong(accumulatorStates.get(0).getState(0)); + } } /** diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/NumericAggregationValue.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/NumericAggregationValue.java index 9c8a0073fc..174df7b9f9 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/NumericAggregationValue.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/NumericAggregationValue.java @@ -28,6 +28,7 @@ import com.apple.foundationdb.record.PlanHashable; import com.apple.foundationdb.record.PlanSerializationContext; import com.apple.foundationdb.record.RecordCoreArgumentException; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.metadata.IndexTypes; import com.apple.foundationdb.record.planprotos.PNumericAggregationValue; import com.apple.foundationdb.record.planprotos.PNumericAggregationValue.PAvg; @@ -62,6 +63,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.BitSet; import java.util.List; @@ -862,5 +864,96 @@ public void accumulate(@Nullable final Object currentObject) { public Object finish() { return physicalOperator.evalPartialToFinal(state); } + + @Nullable + @Override + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult(@Nonnull Message groupingKey) { + if (state == null) { + return null; + } + RecordCursorProto.AccumulatorState.Builder builder = RecordCursorProto.AccumulatorState.newBuilder().setPhysicalOperatorName(physicalOperator.name()); + switch (physicalOperator) { + case SUM_I: + case MAX_I: + case MIN_I: + case SUM_L: + case MAX_L: + case MIN_L: + case SUM_D: + case MAX_D: + case MIN_D: + case SUM_F: + case MAX_F: + case MIN_F: + builder.addState(String.valueOf(state)); + break; + case AVG_I: + case AVG_L: + case AVG_D: + case AVG_F: + Pair pair = (Pair) state; + builder.addState(String.valueOf(pair.getLeft())).addState(String.valueOf(pair.getRight())); + break; + case BITMAP_CONSTRUCT_AGG_I: + case BITMAP_CONSTRUCT_AGG_L: + builder.addState(new String(((BitSet) state).toByteArray(), StandardCharsets.UTF_8)); + break; + default: + break; + + } + return RecordCursorProto.PartialAggregationResult.newBuilder() + .setGroupKey(groupingKey.toByteString()) + .addAccumulatorStates(builder) + .build(); + } + + @Override + public void setInitialState(@Nonnull List accumulatorStates) { + Verify.verify(state == null); + Verify.verify(accumulatorStates.size() == 1); + Verify.verify(physicalOperator.name().equals(accumulatorStates.get(0).getPhysicalOperatorName())); + + switch (physicalOperator) { + case SUM_I: + case MAX_I: + case MIN_I: + state = Integer.parseInt(accumulatorStates.get(0).getState(0)); + break; + case SUM_L: + case MAX_L: + case MIN_L: + state = Long.parseLong(accumulatorStates.get(0).getState(0)); + break; + case SUM_D: + case MAX_D: + case MIN_D: + state = Double.parseDouble(accumulatorStates.get(0).getState(0)); + break; + case SUM_F: + case MAX_F: + case MIN_F: + state = Float.parseFloat(accumulatorStates.get(0).getState(0)); + break; + case AVG_I: + state = Pair.of(Integer.parseInt(accumulatorStates.get(0).getState(0)), Long.parseLong(accumulatorStates.get(0).getState(1))); + break; + case AVG_L: + state = Pair.of(Long.parseLong(accumulatorStates.get(0).getState(0)), Long.parseLong(accumulatorStates.get(0).getState(1))); + break; + case AVG_D: + state = Pair.of(Double.parseDouble(accumulatorStates.get(0).getState(0)), Long.parseLong(accumulatorStates.get(0).getState(1))); + break; + case AVG_F: + state = Pair.of(Float.parseFloat(accumulatorStates.get(0).getState(0)), Long.parseLong(accumulatorStates.get(0).getState(1))); + break; + case BITMAP_CONSTRUCT_AGG_I: + case BITMAP_CONSTRUCT_AGG_L: + state = BitSet.valueOf(accumulatorStates.get(0).getState(0).getBytes(StandardCharsets.UTF_8)); + break; + default: + break; + } + } } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/RecordConstructorValue.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/RecordConstructorValue.java index ea0cafaa0e..bd38ebb18c 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/RecordConstructorValue.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/cascades/values/RecordConstructorValue.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.record.PlanDeserializer; import com.apple.foundationdb.record.PlanHashable; import com.apple.foundationdb.record.PlanSerializationContext; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.planprotos.PRecordConstructorValue; import com.apple.foundationdb.record.planprotos.PValue; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase; @@ -56,6 +57,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -374,6 +376,31 @@ private List buildAccumulators() { } return childAccumulatorsBuilder.build(); } + + @Nullable + @Override + public RecordCursorProto.PartialAggregationResult getPartialAggregationResult(Message groupingKey) { + List accumulatorStates = new ArrayList<>(); + for (Accumulator accumulator: childAccumulators) { + if (accumulator.getPartialAggregationResult(groupingKey) != null) { + accumulatorStates.addAll(accumulator.getPartialAggregationResult(groupingKey).getAccumulatorStatesList()); + } + } + if (accumulatorStates.isEmpty()) { + return null; + } + return RecordCursorProto.PartialAggregationResult.newBuilder() + .setGroupKey(groupingKey.toByteString()) + .addAllAccumulatorStates(accumulatorStates) + .build(); + } + + @Override + public void setInitialState(@Nonnull List accumulatorStates) { + for (int i = 0; i < accumulatorStates.size(); i++) { + childAccumulators.get(i).setInitialState(List.of(accumulatorStates.get(i))); + } + } }; } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java index 17772363f0..7b43323c94 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java @@ -137,7 +137,8 @@ public RecordCursor executePlan(@Nonnull FDBRec @Nonnull EvaluationContext context, @Nullable byte[] continuation, @Nonnull ExecuteProperties executeProperties) { - final var innerCursor = getInnerPlan().executePlan(store, context, continuation, executeProperties.clearSkipAndLimit()); + AggregateCursor.AggregateCursorContinuation aggregateCursorContinuation = AggregateCursor.AggregateCursorContinuation.fromRawBytes(continuation); + final var innerCursor = getInnerPlan().executePlan(store, context, aggregateCursorContinuation.getInnerContinuation(), executeProperties.clearSkipAndLimit()); final var streamGrouping = new StreamGrouping<>(groupingKeyValue, @@ -147,8 +148,11 @@ public RecordCursor executePlan(@Nonnull FDBRec aggregateAlias, (FDBRecordStoreBase)store, context, - inner.getAlias()); - return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty) + inner.getAlias(), + aggregateCursorContinuation.getPartialAggregationResult() + ); + + return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty, aggregateCursorContinuation.getInnerContinuation()) .skipThenLimit(executeProperties.getSkip(), executeProperties.getReturnedRowLimit()); } diff --git a/fdb-record-layer-core/src/main/proto/record_cursor.proto b/fdb-record-layer-core/src/main/proto/record_cursor.proto index c1400c3c8a..8f96f409ab 100644 --- a/fdb-record-layer-core/src/main/proto/record_cursor.proto +++ b/fdb-record-layer-core/src/main/proto/record_cursor.proto @@ -131,4 +131,19 @@ message RecursiveCursorContinuation { optional bool isInitialState = 1; optional planprotos.PTempTable tempTable = 2; optional bytes activeStateContinuation = 3; -} \ No newline at end of file +} + +message AggregateCursorContinuation { + optional bytes continuation = 1; + optional PartialAggregationResult partial_aggregation_results = 2; +} + +message PartialAggregationResult { + optional bytes group_key = 1; + repeated AccumulatorState accumulator_states = 2; +} + +message AccumulatorState { + optional string physical_operator_name = 1; + repeated string state = 2; // 2 for avg +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java index 04d495c2b5..14f6aa1749 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java @@ -20,11 +20,16 @@ package com.apple.foundationdb.record.provider.foundationdb.query; +import com.apple.foundationdb.record.ByteScanLimiterFactory; import com.apple.foundationdb.record.EvaluationContext; import com.apple.foundationdb.record.ExecuteProperties; +import com.apple.foundationdb.record.ExecuteState; import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordCursorContinuation; +import com.apple.foundationdb.record.RecordCursorEndContinuation; import com.apple.foundationdb.record.RecordCursorResult; import com.apple.foundationdb.record.RecordMetaData; +import com.apple.foundationdb.record.RecordScanLimiterFactory; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.query.plan.ScanComparisons; @@ -45,15 +50,18 @@ import com.apple.test.Tags; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -305,6 +313,96 @@ void aggregateNoRecordsNoGroupNoAggregate(final boolean useNestedResult, final i } } + @Test + void partialAggregateSum() { + try (final var context = openContext()) { + openSimpleRecordStore(context, NO_HOOK); + + final var plan = + new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") + .withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value)) + .withGroupCriterion("str_value_indexed") + .build(false); + + // In the testing data, there are 2 groups, each group has 3 rows. + // recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED + // although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START + RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null); + // start the next scan from 4th row, and scans the 4th row (recordScanLimit = 1), return the aggregated result of the first group + RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 1, continuation1.toBytes(), resultOf("0", 3)); + // start the next scan from 5th row, and scans the 5th row (recordScanLimit = 1), return nothing + RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 1, continuation2.toBytes(), null); + // start the next scan from 6th row, and scans the 6th row (recordScanLimit = 2), hit SCAN_LIMIT_REACHED, so return nothing + RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 1, continuation3.toBytes(), null); + // return the aggregated result of the second group + RecordCursorContinuation continuation5 = executePlanWithRecordScanLimit(plan, 1, continuation4.toBytes(), resultOf("1", 12)); + + Assertions.assertEquals(RecordCursorEndContinuation.END, continuation5); + } + } + + @Test + void partialAggregateAvg() { + try (final var context = openContext()) { + openSimpleRecordStore(context, NO_HOOK); + + final var plan = + new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") + .withAggregateValue("num_value_2", value -> new NumericAggregationValue.Avg(NumericAggregationValue.PhysicalOperator.AVG_I, value)) + .withGroupCriterion("str_value_indexed") + .build(false); + + // In the testing data, there are 2 groups, each group has 3 rows. + // recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED + // although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START + RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null); + // start the next scan from 4th row, and scans the 4th row (recordScanLimit = 1), return the aggregated result of the first group + RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 1, continuation1.toBytes(), resultOf("0", 1.0)); + // start the next scan from 5th row, and scans the 5th row (recordScanLimit = 1), return nothing + RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 1, continuation2.toBytes(), null); + // start the next scan from 6th row, and scans the 6th row (recordScanLimit = 2), hit SCAN_LIMIT_REACHED, so return nothing + RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 1, continuation3.toBytes(), null); + // return the aggregated result of the second group + RecordCursorContinuation continuation5 = executePlanWithRecordScanLimit(plan, 1, continuation4.toBytes(), resultOf("1", 4.0)); + + Assertions.assertEquals(RecordCursorEndContinuation.END, continuation5); + } + } + + @Test + void partialAggregateBitmap() { + try (final var context = openContext()) { + openSimpleRecordStore(context, NO_HOOK); + + final var plan = + new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") + .withAggregateValue("num_value_2", value -> new NumericAggregationValue.BitmapConstructAgg(NumericAggregationValue.PhysicalOperator.BITMAP_CONSTRUCT_AGG_I, value)) + .withGroupCriterion("str_value_indexed") + .build(false); + + // In the testing data, there are 2 groups, each group has 3 rows. + // recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED + // although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START + RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null); + // start the next scan from 4th row, and scans the 4th row (recordScanLimit = 1), return the aggregated result of the first group + byte[] first = new byte[1250]; + // first[0] = b'00000111 + first[0] = 7; + RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 1, continuation1.toBytes(), resultOf("0", ByteString.copyFrom(first))); + // start the next scan from 5th row, and scans the 5th row (recordScanLimit = 1), return nothing + RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 1, continuation2.toBytes(), null); + // start the next scan from 6th row, and scans the 6th row (recordScanLimit = 2), hit SCAN_LIMIT_REACHED, so return nothing + RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 1, continuation3.toBytes(), null); + // return the aggregated result of the second group + byte[] second = new byte[1250]; + // second[0] = b'00111000 + second[0] = 56; + RecordCursorContinuation continuation5 = executePlanWithRecordScanLimit(plan, 1, continuation4.toBytes(), resultOf("1", ByteString.copyFrom(second))); + + Assertions.assertEquals(RecordCursorEndContinuation.END, continuation5); + } + } + private static Stream provideArguments() { // (boolean, rowLimit) // setting rowLimit = 0 is equivalent to no limit @@ -334,24 +432,47 @@ private void populateDB(final int numRecords) throws Exception { } @Nonnull - private RecordCursor executePlan(final RecordQueryPlan originalPlan, final int rowLimit, final byte[] continuation) { + private RecordCursor executePlan(final RecordQueryPlan originalPlan, final int rowLimit, final int recordScanLimit, final byte[] continuation) { final RecordQueryPlan plan = verifySerialization(originalPlan); final var types = plan.getDynamicTypes(); final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build(); + ExecuteState executeState; + if (recordScanLimit > 0) { + executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), ByteScanLimiterFactory.tracking()); + } else { + executeState = ExecuteState.NO_LIMITS; + } ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE; - executeProperties = executeProperties.setReturnedRowLimit(rowLimit); - try { - return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties); - } catch (final Throwable t) { - throw Assertions.fail(t); + executeProperties = executeProperties.setReturnedRowLimit(rowLimit).setState(executeState); + return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties); + } + + private RecordCursorContinuation executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, @Nullable List expectedResult) { + List queryResults = new LinkedList<>(); + RecordCursor currentCursor = executePlan(plan, 0, recordScanLimit, continuation); + RecordCursorResult currentCursorResult; + RecordCursorContinuation cursorContinuation; + while (true) { + currentCursorResult = currentCursor.getNext(); + cursorContinuation = currentCursorResult.getContinuation(); + if (!currentCursorResult.hasNext()) { + break; + } + queryResults.add(currentCursorResult.get()); + } + if (expectedResult == null) { + Assertions.assertTrue(queryResults.isEmpty()); + } else { + assertResults(this::assertResultFlattened, queryResults, expectedResult); } + return cursorContinuation; } private List executePlanWithRowLimit(final RecordQueryPlan plan, final int rowLimit) { byte[] continuation = null; List queryResults = new LinkedList<>(); while (true) { - RecordCursor currentCursor = executePlan(plan, rowLimit, continuation); + RecordCursor currentCursor = executePlan(plan, rowLimit, 0, continuation); RecordCursorResult currentCursorResult; while (true) { currentCursorResult = currentCursor.getNext(); @@ -370,7 +491,7 @@ private List executePlanWithRowLimit(final RecordQueryPlan plan, fi private void assertResults(@Nonnull final BiConsumer> checkConsumer, @Nonnull final List actual, @Nonnull final List... expected) { Assertions.assertEquals(expected.length, actual.size()); - for (var i = 0 ; i < actual.size() ; i++) { + for (var i = 0; i < actual.size(); i++) { checkConsumer.accept(actual.get(i), expected[i]); } } @@ -386,7 +507,7 @@ private void assertResultFlattened(final QueryResult actual, final List expec final var resultFields = resultFieldsBuilder.build(); Assertions.assertEquals(resultFields.size(), expected.size()); - for (var i = 0 ; i < resultFields.size() ; i++) { + for (var i = 0; i < resultFields.size(); i++) { Assertions.assertEquals(expected.get(i), resultFields.get(i)); } } @@ -417,7 +538,7 @@ private void assertResultNested(final QueryResult actual, final List expected final var resultFields = resultFieldsBuilder.build(); Assertions.assertEquals(resultFields.size(), expected.size()); - for (var i = 0 ; i < resultFields.size() ; i++) { + for (var i = 0; i < resultFields.size(); i++) { Assertions.assertEquals(expected.get(i), resultFields.get(i)); } } diff --git a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java index ed99e1130b..65deff5b79 100644 --- a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java +++ b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java @@ -83,6 +83,7 @@ public boolean hasNext() { private void fetchNextResult() { if (result != null) { + continuation = ContinuationImpl.fromRecordCursorContinuation(result.getContinuation()); return; } result = recordCursor.getNext(); diff --git a/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/GroupByQueryTests.java b/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/GroupByQueryTests.java index 111e742b35..6ea4815479 100644 --- a/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/GroupByQueryTests.java +++ b/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/GroupByQueryTests.java @@ -20,6 +20,8 @@ package com.apple.foundationdb.relational.recordlayer.query; +import com.apple.foundationdb.relational.api.Continuation; +import com.apple.foundationdb.relational.api.Options; import com.apple.foundationdb.relational.api.RelationalResultSet; import com.apple.foundationdb.relational.recordlayer.EmbeddedRelationalExtension; import com.apple.foundationdb.relational.recordlayer.Utils; @@ -33,6 +35,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; +import java.util.Base64; import static com.apple.foundationdb.relational.recordlayer.query.QueryTestUtils.insertT1Record; @@ -46,6 +49,73 @@ public GroupByQueryTests() { Utils.enableCascadesDebugger(); } + @Test + void groupByWithScanLimit() throws Exception { + final String schemaTemplate = + "CREATE TABLE T1(pk bigint, a bigint, b bigint, c bigint, PRIMARY KEY(pk))" + + "CREATE INDEX idx1 as select a, b, c from t1 order by a, b, c"; + try (var ddl = Ddl.builder().database(URI.create("/TEST/QT")).relationalExtension(relationalExtension).schemaTemplate(schemaTemplate).build()) { + try (var conn = ddl.setSchemaAndGetConnection()) { + conn.setOption(Options.Name.EXECUTION_SCANNED_ROWS_LIMIT, 2); + try (var statement = conn.createStatement()) { + insertT1Record(statement, 2, 1, 1, 20); + insertT1Record(statement, 3, 1, 2, 5); + insertT1Record(statement, 4, 1, 2, 15); + insertT1Record(statement, 5, 1, 2, 5); + insertT1Record(statement, 6, 2, 1, 10); + insertT1Record(statement, 7, 2, 1, 40); + insertT1Record(statement, 8, 2, 1, 20); + insertT1Record(statement, 9, 2, 1, 90); + + String query = "SELECT a AS OK, b, MAX(c) FROM T1 GROUP BY a, b"; + Continuation continuation = null; + // scan pk = 2 and pk = 3 and hit SCAN_LIMIT_REACHED + Assertions.assertTrue(statement.execute(query), "Did not return a result set from a select statement!"); + try (final RelationalResultSet resultSet = statement.getResultSet()) { + ResultSetAssert.assertThat(resultSet).hasNextRow() + .isRowExactly(1L, 1L, 20L) + .hasNoNextRow(); + continuation = resultSet.getContinuation(); + } + // scan pk = 5 and pk = 4 rows, hit SCAN_LIMIT_REACHED + String postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'"; + Assertions.assertTrue(statement.execute(query + postfix), "Did not return a result set from a select statement!"); + try (final RelationalResultSet resultSet = statement.getResultSet()) { + ResultSetAssert.assertThat(resultSet) + .hasNoNextRow(); + continuation = resultSet.getContinuation(); + } + // scan pk = 6 and pk = 8 rows, hit SCAN_LIMIT_REACHED + postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'"; + Assertions.assertTrue(statement.execute(query + postfix), "Did not return a result set from a select statement!"); + try (final RelationalResultSet resultSet = statement.getResultSet()) { + ResultSetAssert.assertThat(resultSet).hasNextRow() + .isRowExactly(1L, 2L, 15L) + .hasNoNextRow(); + continuation = resultSet.getContinuation(); + } + // scan pk = 7 and pk = 9 rows, hit SCAN_LIMIT_REACHED + postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'"; + Assertions.assertTrue(statement.execute(query + postfix), "Did not return a result set from a select statement!"); + try (final RelationalResultSet resultSet = statement.getResultSet()) { + ResultSetAssert.assertThat(resultSet).hasNoNextRow(); + continuation = resultSet.getContinuation(); + } + // hit SOURCE_EXHAUSTED + postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'"; + Assertions.assertTrue(statement.execute(query + postfix), "Did not return a result set from a select statement!"); + try (final RelationalResultSet resultSet = statement.getResultSet()) { + ResultSetAssert.assertThat(resultSet).hasNextRow() + .isRowExactly(2L, 1L, 90L) + .hasNoNextRow(); + continuation = resultSet.getContinuation(); + } + Assertions.assertTrue(continuation.atEnd()); + } + } + } + } + @Test void groupByClauseWithPredicateWorks() throws Exception { final String schemaTemplate =