diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java index ad000a659a..b9a885e310 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java @@ -25,7 +25,6 @@ import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorResult; -import com.apple.foundationdb.record.RecordCursorStartContinuation; import com.apple.foundationdb.record.RecordCursorVisitor; import com.apple.foundationdb.record.query.plan.plans.QueryResult; import com.google.common.base.Verify; @@ -50,7 +49,6 @@ public class AggregateCursor implements RecordCursor streamGrouping; - private final boolean isCreateDefaultOnEmpty; // Previous record processed by this cursor @Nullable private RecordCursorResult previousResult; @@ -59,11 +57,9 @@ public class AggregateCursor implements RecordCursor previousValidResult; public AggregateCursor(@Nonnull RecordCursor inner, - @Nonnull final StreamGrouping streamGrouping, - boolean isCreateDefaultOnEmpty) { + @Nonnull final StreamGrouping streamGrouping) { this.inner = inner; this.streamGrouping = streamGrouping; - this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; } @Nonnull @@ -77,7 +73,7 @@ public CompletableFuture> onNext() { return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> { previousResult = innerResult; if (!innerResult.hasNext()) { - if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) { + if (!isNoRecords()) { streamGrouping.finalizeGroup(); } return false; @@ -93,11 +89,7 @@ public CompletableFuture> onNext() { }), getExecutor()).thenApply(vignore -> { if (isNoRecords()) { // Edge case where there are no records at all - if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) { - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START); - } else { - return RecordCursorResult.exhausted(); - } + return RecordCursorResult.exhausted(); } // Use the last valid result for the continuation as we need non-terminal one here. RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java index 721a920657..805841c437 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java @@ -193,8 +193,4 @@ private Object evalGroupingKey(@Nullable final Object currentObject) { final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject); return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext); } - - public boolean isResultOnEmpty() { - return groupingKeyValue == null; - } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java index 17772363f0..3398ff2e9a 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.record.PlanDeserializer; import com.apple.foundationdb.record.PlanHashable; import com.apple.foundationdb.record.PlanSerializationContext; +import com.apple.foundationdb.record.RecordCoreArgumentException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.cursors.aggregate.AggregateCursor; import com.apple.foundationdb.record.cursors.aggregate.StreamGrouping; @@ -96,13 +97,6 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC private final CorrelationIdentifier aggregateAlias; @Nonnull private final Value completeResultValue; - // - // This flag is needed to distinguish if we need to create a default value on-empty or not (i.e. - // RecordQueryDefaultOnEmptyPlan will do that going forward). We will always plan with that flag set to false going - // forward, but we accept and honor this field coming from proto if we are continuing OR if it not there we imply - // true. - // https://github.com/FoundationDB/fdb-record-layer/issues/3107 - private final boolean isCreateDefaultOnEmpty; /** * Construct a new plan. @@ -119,15 +113,13 @@ private RecordQueryStreamingAggregationPlan(@Nonnull final Quantifier.Physical i @Nonnull final AggregateValue aggregateValue, @Nonnull final CorrelationIdentifier groupingKeyAlias, @Nonnull final CorrelationIdentifier aggregateAlias, - @Nonnull final Value completeResultValue, - final boolean isCreateDefaultOnEmpty) { + @Nonnull final Value completeResultValue) { this.inner = inner; this.groupingKeyValue = groupingKeyValue; this.aggregateValue = aggregateValue; this.groupingKeyAlias = groupingKeyAlias; this.aggregateAlias = aggregateAlias; this.completeResultValue = completeResultValue; - this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; } @Nonnull @@ -148,7 +140,7 @@ public RecordCursor executePlan(@Nonnull FDBRec (FDBRecordStoreBase)store, context, inner.getAlias()); - return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty) + return new AggregateCursor<>(innerCursor, streamGrouping) .skipThenLimit(executeProperties.getSkip(), executeProperties.getReturnedRowLimit()); } @@ -206,8 +198,7 @@ public RecordQueryStreamingAggregationPlan translateCorrelations(@Nonnull final translatedAggregateValue, groupingKeyAlias, aggregateAlias, - completeResultValue, - isCreateDefaultOnEmpty); + completeResultValue); } @Nonnull @@ -218,8 +209,7 @@ public RecordQueryStreamingAggregationPlan withChild(@Nonnull final Reference ch aggregateValue, groupingKeyAlias, aggregateAlias, - completeResultValue, - isCreateDefaultOnEmpty); + completeResultValue); } @Nonnull @@ -371,7 +361,7 @@ public PRecordQueryStreamingAggregationPlan toProto(@Nonnull final PlanSerializa builder.setGroupingKeyAlias(groupingKeyAlias.getId()) .setAggregateAlias(aggregateAlias.getId()) .setCompleteResultValue(completeResultValue.toValueProto(serializationContext)) - .setIsCreateDefaultOnEmpty(isCreateDefaultOnEmpty); + .setIsCreateDefaultOnEmpty(false); return builder.build(); } @@ -396,7 +386,10 @@ public static RecordQueryStreamingAggregationPlan fromProto(@Nonnull final PlanS final CorrelationIdentifier aggregateAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias())); final Value completeResultValue = Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue())); final boolean isCreateDefaultOnEmpty = recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true; - return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue, isCreateDefaultOnEmpty); + if (isCreateDefaultOnEmpty) { + throw new RecordCoreArgumentException("cannot create streaming aggregate plan with default value on empty"); + } + return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue); } @Nonnull @@ -428,7 +421,7 @@ public static RecordQueryStreamingAggregationPlan of(@Nonnull final Quantifier.P final var referencedAggregateValue = ObjectValue.of(aggregateAlias, aggregateValue.getResultType()); return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, - resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue), false); + resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue)); } /**