Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming aggregate cursor now requires 4.1.9.0 or later for continuation deserialization #3246

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorStartContinuation;
import com.apple.foundationdb.record.RecordCursorVisitor;
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
import com.google.common.base.Verify;
Expand All @@ -50,7 +49,6 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
// group aggregator to break incoming records into groups
@Nonnull
private final StreamGrouping<M> streamGrouping;
private final boolean isCreateDefaultOnEmpty;
// Previous record processed by this cursor
@Nullable
private RecordCursorResult<QueryResult> previousResult;
Expand All @@ -59,11 +57,9 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
private RecordCursorResult<QueryResult> previousValidResult;

public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner,
@Nonnull final StreamGrouping<M> streamGrouping,
boolean isCreateDefaultOnEmpty) {
@Nonnull final StreamGrouping<M> streamGrouping) {
this.inner = inner;
this.streamGrouping = streamGrouping;
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
}

@Nonnull
Expand All @@ -77,7 +73,7 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
previousResult = innerResult;
if (!innerResult.hasNext()) {
if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) {
if (!isNoRecords()) {
streamGrouping.finalizeGroup();
}
return false;
Expand All @@ -93,11 +89,7 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
}), getExecutor()).thenApply(vignore -> {
if (isNoRecords()) {
// Edge case where there are no records at all
if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) {
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START);
} else {
return RecordCursorResult.exhausted();
}
return RecordCursorResult.exhausted();
}
// Use the last valid result for the continuation as we need non-terminal one here.
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,4 @@ private Object evalGroupingKey(@Nullable final Object currentObject) {
final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext);
}

public boolean isResultOnEmpty() {
return groupingKeyValue == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.apple.foundationdb.record.PlanDeserializer;
import com.apple.foundationdb.record.PlanHashable;
import com.apple.foundationdb.record.PlanSerializationContext;
import com.apple.foundationdb.record.RecordCoreArgumentException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.cursors.aggregate.AggregateCursor;
import com.apple.foundationdb.record.cursors.aggregate.StreamGrouping;
Expand Down Expand Up @@ -96,13 +97,6 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC
private final CorrelationIdentifier aggregateAlias;
@Nonnull
private final Value completeResultValue;
//
// This flag is needed to distinguish if we need to create a default value on-empty or not (i.e.
// RecordQueryDefaultOnEmptyPlan will do that going forward). We will always plan with that flag set to false going
// forward, but we accept and honor this field coming from proto if we are continuing OR if it not there we imply
// true.
// https://github.com/FoundationDB/fdb-record-layer/issues/3107
private final boolean isCreateDefaultOnEmpty;

/**
* Construct a new plan.
Expand All @@ -119,15 +113,13 @@ private RecordQueryStreamingAggregationPlan(@Nonnull final Quantifier.Physical i
@Nonnull final AggregateValue aggregateValue,
@Nonnull final CorrelationIdentifier groupingKeyAlias,
@Nonnull final CorrelationIdentifier aggregateAlias,
@Nonnull final Value completeResultValue,
final boolean isCreateDefaultOnEmpty) {
@Nonnull final Value completeResultValue) {
this.inner = inner;
this.groupingKeyValue = groupingKeyValue;
this.aggregateValue = aggregateValue;
this.groupingKeyAlias = groupingKeyAlias;
this.aggregateAlias = aggregateAlias;
this.completeResultValue = completeResultValue;
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
}

@Nonnull
Expand All @@ -148,7 +140,7 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull FDBRec
(FDBRecordStoreBase<Message>)store,
context,
inner.getAlias());
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty)
return new AggregateCursor<>(innerCursor, streamGrouping)
.skipThenLimit(executeProperties.getSkip(),
executeProperties.getReturnedRowLimit());
}
Expand Down Expand Up @@ -206,8 +198,7 @@ public RecordQueryStreamingAggregationPlan translateCorrelations(@Nonnull final
translatedAggregateValue,
groupingKeyAlias,
aggregateAlias,
completeResultValue,
isCreateDefaultOnEmpty);
completeResultValue);
}

@Nonnull
Expand All @@ -218,8 +209,7 @@ public RecordQueryStreamingAggregationPlan withChild(@Nonnull final Reference ch
aggregateValue,
groupingKeyAlias,
aggregateAlias,
completeResultValue,
isCreateDefaultOnEmpty);
completeResultValue);
}

@Nonnull
Expand Down Expand Up @@ -371,7 +361,7 @@ public PRecordQueryStreamingAggregationPlan toProto(@Nonnull final PlanSerializa
builder.setGroupingKeyAlias(groupingKeyAlias.getId())
.setAggregateAlias(aggregateAlias.getId())
.setCompleteResultValue(completeResultValue.toValueProto(serializationContext))
.setIsCreateDefaultOnEmpty(isCreateDefaultOnEmpty);
.setIsCreateDefaultOnEmpty(false);
return builder.build();
}

Expand All @@ -396,7 +386,10 @@ public static RecordQueryStreamingAggregationPlan fromProto(@Nonnull final PlanS
final CorrelationIdentifier aggregateAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias()));
final Value completeResultValue = Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue()));
final boolean isCreateDefaultOnEmpty = recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true;
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue, isCreateDefaultOnEmpty);
if (isCreateDefaultOnEmpty) {
throw new RecordCoreArgumentException("cannot create streaming aggregate plan with default value on empty");
}
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue);
}

@Nonnull
Expand Down Expand Up @@ -428,7 +421,7 @@ public static RecordQueryStreamingAggregationPlan of(@Nonnull final Quantifier.P
final var referencedAggregateValue = ObjectValue.of(aggregateAlias, aggregateValue.getResultType());

return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias,
resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue), false);
resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue));
}

/**
Expand Down