diff --git a/src/Constants.cpp b/src/Constants.cpp index ce826ea..c659a48 100644 --- a/src/Constants.cpp +++ b/src/Constants.cpp @@ -48,6 +48,26 @@ const char* METADATA_VERSION_FIELD = "metadata version"; const char* NS_FIELD = "ns"; const char* QUERY_FIELD = "query"; const char* CURRENTLY_PROCESSING_DOC_FIELD = "currently processing document"; +const char* GET_MORE_CMD_BATCH_SIZE_FIELD = "batchSize"; +const char* GET_MORE_CMD_CURSOR_ID_FIELD = "getMore"; +const char* GET_MORE_CMD_CURSOR_COLLECTION_FIELD = "collection"; +const char* FIND_CMD_FIND_FIELD = "find"; +const char* FIND_CMD_FILTER_FIELD = "filter"; +const char* FIND_CMD_SORT_FIELD = "sort"; +const char* FIND_CMD_PROJECTION_FIELD = "projection"; +const char* FIND_CMD_HINT_FIELD = "hint"; +const char* FIND_CMD_SKIP_FIELD = "skip"; +const char* FIND_CMD_LIMIT_FIELD = "limit"; +const char* FIND_CMD_BATCH_SIZE_FIELD = "batchSize"; +const char* FIND_CMD_MAX_TIME_MS_FIELD = "maxTimeMS"; +const char* FIND_CMD_REPLY_CURSOR_FIELD = "cursor"; +const char* FIND_CMD_REPLY_CURSOR_ID_FIELD = "id"; +const char* FIND_CMD_REPLY_CURSOR_NS_FIELD = "ns"; +const char* FIND_CMD_REPLY_CURSOR_FIRST_BATCH_FIELD = "firstBatch"; +const char* FIND_CMD_REPLY_CURSOR_NEXT_BATCH_FIELD = "nextBatch"; +const char* KILL_CURSORS_CMD_KILL_CURSORS_FIELD = "killCursors"; +const char* KILL_CURSORS_CMD_CURSORS_FIELD = "cursors"; +const char* EXPLAIN_CMD_FIELD = "explain"; const std::string RENAME = "$rename"; const std::string SET = "$set"; diff --git a/src/Constants.h b/src/Constants.h index e0c87c7..3643dd8 100644 --- a/src/Constants.h +++ b/src/Constants.h @@ -58,6 +58,26 @@ extern const char* METADATA_VERSION_FIELD; extern const char* NS_FIELD; extern const char* QUERY_FIELD; extern const char* CURRENTLY_PROCESSING_DOC_FIELD; +extern const char* GET_MORE_CMD_BATCH_SIZE_FIELD; +extern const char* GET_MORE_CMD_CURSOR_ID_FIELD; +extern const char* GET_MORE_CMD_CURSOR_COLLECTION_FIELD; +extern const char* FIND_CMD_FIND_FIELD; +extern const char* FIND_CMD_FILTER_FIELD; +extern const char* FIND_CMD_SORT_FIELD; +extern const char* FIND_CMD_PROJECTION_FIELD; +extern const char* FIND_CMD_HINT_FIELD; +extern const char* FIND_CMD_SKIP_FIELD; +extern const char* FIND_CMD_LIMIT_FIELD; +extern const char* FIND_CMD_BATCH_SIZE_FIELD; +extern const char* FIND_CMD_MAX_TIME_MS_FIELD; +extern const char* FIND_CMD_REPLY_CURSOR_FIELD; +extern const char* FIND_CMD_REPLY_CURSOR_ID_FIELD; +extern const char* FIND_CMD_REPLY_CURSOR_NS_FIELD; +extern const char* FIND_CMD_REPLY_CURSOR_FIRST_BATCH_FIELD; +extern const char* FIND_CMD_REPLY_CURSOR_NEXT_BATCH_FIELD; +extern const char* KILL_CURSORS_CMD_KILL_CURSORS_FIELD; +extern const char* KILL_CURSORS_CMD_CURSORS_FIELD; +extern const char* EXPLAIN_CMD_FIELD; // Mongo Operators extern const std::string RENAME; diff --git a/src/Cursor.h b/src/Cursor.h index 0e4910a..33632fc 100644 --- a/src/Cursor.h +++ b/src/Cursor.h @@ -31,11 +31,12 @@ struct Cursor : ReferenceCounted, NonCopyable { Reference checkpoint; int64_t id; int32_t returned; + int32_t limit; std::map>* siblings; time_t expiry; - Cursor(FutureStream> docs, Reference checkpoint) - : docs(docs), checkpoint(checkpoint), returned(0) { + Cursor(FutureStream> docs, Reference checkpoint, int32_t limit = 0) + : docs(docs), checkpoint(checkpoint), returned(0), limit(limit) { id = g_random->randomInt64(INT64_MIN, INT64_MAX); expiry = time(nullptr) + DOCLAYER_KNOBS->CURSOR_EXPIRY; siblings = nullptr; diff --git a/src/Ext.h b/src/Ext.h index 75a27d0..ea7aecd 100644 --- a/src/Ext.h +++ b/src/Ext.h @@ -26,9 +26,9 @@ #include "bson.h" #include "flow/flow.h" -#define EXT_SERVER_VERSION "3.0.0" -#define EXT_SERVER_VERSION_ARRAY BSON_ARRAY(3 << 0 << 0) +#define EXT_SERVER_VERSION "3.2.0" +#define EXT_SERVER_VERSION_ARRAY BSON_ARRAY(3 << 2 << 0) #define EXT_MIN_WIRE_VERSION 0 -#define EXT_MAX_WIRE_VERSION 3 +#define EXT_MAX_WIRE_VERSION 4 #endif /* _EXT_H_ */ diff --git a/src/ExtCmd.actor.cpp b/src/ExtCmd.actor.cpp index cca4c52..4fa760c 100644 --- a/src/ExtCmd.actor.cpp +++ b/src/ExtCmd.actor.cpp @@ -919,6 +919,246 @@ struct AvailableQueryOptionsCmd { }; REGISTER_CMD(AvailableQueryOptionsCmd, "availablequeryoptions"); +ACTOR static Future> doGetMore(Reference reply, + Reference ec, + Namespace ns, + int64_t cursorID, + int32_t batchSize = 101) { + state Reference cursor = ec->cursors[cursorID]; + + if (cursor) { + try { + int32_t returned = + wait(addDocumentsFromCursor(cursor, reply, ns, batchSize, AddDocsFromCursorCaller::GET_MORE_CMD)); + reply->replyHeader.startingFrom = cursor->returned - returned; + reply->addResponseFlag(8 /*0b1000*/); + cursor->refresh(); + } catch (Error& e) { + reply->setError(e); + } + } else { + reply->addDocument(BSON("ok" << 0 << "errmsg" + << "CursorNotFound" + << "code" << 43)); + reply->addResponseFlag(0 /*0b0000*/); // set flag to 0 indicating cursor does not exist + } + return reply; +} + +struct GetMoreCmd { + static const char* name; + static Future> call(Reference nmc, + Reference query, + Reference reply) { + if (!query->query.getField(DocLayerConstants::GET_MORE_CMD_CURSOR_ID_FIELD).isNumber() || + !query->query.getField(DocLayerConstants::GET_MORE_CMD_CURSOR_COLLECTION_FIELD).isString()) { + TraceEvent(SevWarn, "WireBadGetMoreCmd").detail("query", query->query.toString()).suppressFor(1.0); + throw wire_protocol_mismatch(); + } + + Namespace ns = std::make_pair( + query->ns.first, query->query.getField(DocLayerConstants::GET_MORE_CMD_CURSOR_COLLECTION_FIELD).str()); + + bson::BSONElement batchSizeField = query->query.getField(DocLayerConstants::GET_MORE_CMD_BATCH_SIZE_FIELD); + int64_t cursorID = query->query.getField(DocLayerConstants::GET_MORE_CMD_CURSOR_ID_FIELD).numberLong(); + return doGetMore(reply, nmc, query->ns, cursorID, batchSizeField.isNumber() ? batchSizeField.numberInt() : 101); + } +}; + +REGISTER_CMD(GetMoreCmd, "getmore"); + +ACTOR static Future> doExplain( + Reference ec, + Reference reply, + Namespace ns, + bson::BSONObj filter, + bson::BSONObj projection, + Optional ordering = Optional(), + int32_t limit = 0, // hard limit on how many a cursor can return before got closed + int32_t skip = 0, + int32_t batchSize = 101, // how many docs to return in first batch + Optional maxTimeMS = Optional(), // TODO make use of maxTimeMS + Optional hint = Optional() // TODO make use of hint +) { + try { + state Reference dtr = ec->getOperationTransaction(); + state Reference cx = wait(ec->mm->getUnboundCollectionContext(dtr, ns, true)); + state Reference cursor; + + state Reference plan = planQuery(cx, filter); + if (!ordering.present() && skip > 0) + plan = ref(new SkipPlan(skip, plan)); + plan = planProjection(plan, projection, ordering); + plan = ec->wrapOperationPlan(plan, true, cx); + if (ordering.present()) { + plan = ref(new SortPlan(plan, ordering.get())); + if (skip > 0) + plan = ref(new SkipPlan(skip, plan)); + } + reply->addDocument(BSON("ok" << 1 << "explanation" << plan->describe())); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + reply->setError(e.what(), e.code()); + } + } + return reply; +} + +struct ExplainCmd { + static const char* name; + static Future> call(Reference nmc, + Reference msg, + Reference reply) { + bson::BSONElement explainField = msg->query.getField(DocLayerConstants::EXPLAIN_CMD_FIELD); + if (!explainField.isABSONObj() || + !explainField.embeddedObject().getField(DocLayerConstants::FIND_CMD_FIND_FIELD).isString()) { + throw wire_protocol_mismatch(); + } + + bson::BSONObj explainObj = explainField.embeddedObject(); + + Namespace ns = std::make_pair(msg->ns.first, explainObj.getStringField(DocLayerConstants::FIND_CMD_FIND_FIELD)); + + bson::BSONElement filterField = explainObj.getField(DocLayerConstants::FIND_CMD_FILTER_FIELD); + bson::BSONElement projectionField = explainObj.getField(DocLayerConstants::FIND_CMD_PROJECTION_FIELD); + bson::BSONElement orderingField = explainObj.getField(DocLayerConstants::FIND_CMD_SORT_FIELD); + bson::BSONElement limitField = explainObj.getField(DocLayerConstants::FIND_CMD_LIMIT_FIELD); + bson::BSONElement skipField = explainObj.getField(DocLayerConstants::FIND_CMD_SKIP_FIELD); + bson::BSONElement batchSizeField = explainObj.getField(DocLayerConstants::FIND_CMD_BATCH_SIZE_FIELD); + bson::BSONElement maxTimeMSField = explainObj.getField(DocLayerConstants::FIND_CMD_MAX_TIME_MS_FIELD); + bson::BSONElement hintField = explainObj.getField(DocLayerConstants::FIND_CMD_HINT_FIELD); + + return doExplain( + nmc, reply, ns, filterField.isABSONObj() ? filterField.embeddedObject() : bson::BSONObj(), + projectionField.isABSONObj() ? projectionField.embeddedObject() : bson::BSONObj(), + orderingField.isABSONObj() ? Optional(orderingField.embeddedObject()) + : Optional(), + limitField.isNumber() ? limitField.numberInt() : 0, skipField.isNumber() ? skipField.numberInt() : 0, + batchSizeField.isNumber() ? batchSizeField.numberInt() : 101, + maxTimeMSField.isNumber() ? Optional(maxTimeMSField.numberLong()) : Optional(), + hintField.isABSONObj() ? Optional(hintField.embeddedObject()) : Optional()); + } +}; + +REGISTER_CMD(ExplainCmd, "explain"); + +ACTOR static Future> doFind( + Reference ec, + Reference reply, + Namespace ns, + bson::BSONObj filter, + bson::BSONObj projection, + Optional ordering = Optional(), + int32_t limit = 0, // hard limit on how many a cursor can return before got closed + int32_t skip = 0, + int32_t batchSize = 101, // how many docs to return in first batch + Optional maxTimeMS = Optional(), // TODO make use of maxTimeMS + Optional hint = Optional() // TODO make use of hint +) { + try { + state Reference dtr = ec->getOperationTransaction(); + state Reference cx = wait(ec->mm->getUnboundCollectionContext(dtr, ns, true)); + state Reference cursor; + + state Reference plan = planQuery(cx, filter); + if (!ordering.present() && skip > 0) + plan = ref(new SkipPlan(skip, plan)); + plan = planProjection(plan, projection, ordering); + plan = ec->wrapOperationPlan(plan, true, cx); + if (ordering.present()) { + plan = ref(new SortPlan(plan, ordering.get())); + if (skip > 0) + plan = ref(new SkipPlan(skip, plan)); + } + // TODO remove the $explain operator handling here since it should have been deprecated in this command from 3.2 + if (filter.hasField("$explain")) { + reply->addDocument(BSON("explanation" << plan->describe())); + return reply; + } + + Reference outerCheckpoint(new PlanCheckpoint); + + // Add a new cursor to the server's cursor collection + cursor = Cursor::add(ec->cursors, Reference(new Cursor(plan->execute(outerCheckpoint.getPtr(), dtr), + outerCheckpoint, limit))); + int32_t returned = + wait(addDocumentsFromCursor(cursor, reply, ns, batchSize, AddDocsFromCursorCaller::FIND_CMD)); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + reply->setError(e.what(), e.code()); + } + } + return reply; +} + +struct FindCmd { + static const char* name; + static Future> call(Reference ec, + Reference msg, + Reference reply) { + bson::BSONElement findField = msg->query.getField(DocLayerConstants::FIND_CMD_FIND_FIELD); + if (!findField.isString()) { + throw wire_protocol_mismatch(); + } + + Namespace ns = std::make_pair(msg->ns.first, findField.str()); + + bson::BSONElement filterField = msg->query.getField(DocLayerConstants::FIND_CMD_FILTER_FIELD); + bson::BSONElement projectionField = msg->query.getField(DocLayerConstants::FIND_CMD_PROJECTION_FIELD); + bson::BSONElement orderingField = msg->query.getField(DocLayerConstants::FIND_CMD_SORT_FIELD); + bson::BSONElement limitField = msg->query.getField(DocLayerConstants::FIND_CMD_LIMIT_FIELD); + bson::BSONElement skipField = msg->query.getField(DocLayerConstants::FIND_CMD_SKIP_FIELD); + bson::BSONElement batchSizeField = msg->query.getField(DocLayerConstants::FIND_CMD_BATCH_SIZE_FIELD); + bson::BSONElement maxTimeMSField = msg->query.getField(DocLayerConstants::FIND_CMD_MAX_TIME_MS_FIELD); + bson::BSONElement hintField = msg->query.getField(DocLayerConstants::FIND_CMD_HINT_FIELD); + + return doFind( + ec, reply, ns, filterField.isABSONObj() ? filterField.embeddedObject() : bson::BSONObj(), + projectionField.isABSONObj() ? projectionField.embeddedObject() : bson::BSONObj(), + orderingField.isABSONObj() ? Optional(orderingField.embeddedObject()) + : Optional(), + limitField.isNumber() ? limitField.numberInt() : 0, skipField.isNumber() ? skipField.numberInt() : 0, + batchSizeField.isNumber() ? batchSizeField.numberInt() : 101, + maxTimeMSField.isNumber() ? Optional(maxTimeMSField.numberLong()) : Optional(), + hintField.isABSONObj() ? Optional(hintField.embeddedObject()) : Optional()); + } +}; + +REGISTER_CMD(FindCmd, "find"); + +static Future> doKillCursors(Reference ec, + Reference reply, + bson::BSONArray cursorIds) { + for (bson::BSONObj::iterator i = cursorIds.begin(); i.more();) { + bson::BSONElement cursorId = i.next(); + if (!cursorId.isNumber()) { + throw wire_protocol_mismatch(); + } + Cursor::pluck(ec->cursors[cursorId.numberLong()]); + } + reply->addDocument(BSON("ok" << 1)); + return reply; +} + +struct KillCursorsCmd { + static const char* name; + static Future> call(Reference ec, + Reference msg, + Reference reply) { + bson::BSONElement killCursors = msg->query.getField(DocLayerConstants::KILL_CURSORS_CMD_KILL_CURSORS_FIELD); + if (!killCursors.isString()) { + throw wire_protocol_mismatch(); + } + bson::BSONElement cursorsField = msg->query.getField(DocLayerConstants::KILL_CURSORS_CMD_CURSORS_FIELD); + if (cursorsField.type() != bson::BSONType::Array) { + throw wire_protocol_mismatch(); + } + return doKillCursors(ec, reply, bson::BSONArray(cursorsField.Obj())); + } +}; + +REGISTER_CMD(KillCursorsCmd, "killcursors"); + struct GetMemoryUsageCmd { static const char* name; static Future> call(Reference nmc, diff --git a/src/ExtMsg.actor.cpp b/src/ExtMsg.actor.cpp index baeaf90..41ad1e2 100644 --- a/src/ExtMsg.actor.cpp +++ b/src/ExtMsg.actor.cpp @@ -188,12 +188,6 @@ Reference planQuery(Reference cx, bson::BSONObj return plan; } -Reference planProjection(Reference plan, - bson::BSONObj const& selector, - Optional const& ordering) { - return Reference(new ProjectionPlan(parseProjection(selector), plan, ordering)); -} - ExtMsgQuery::ExtMsgQuery(ExtMsgHeader* header, const uint8_t* body) : header(header) { const uint8_t* ptr = body; const uint8_t* eom = (uint8_t*)header + header->messageLength; @@ -332,15 +326,20 @@ ACTOR static Future> listCollections(Reference addDocumentsFromCursor(Reference cursor, - Reference reply, - int32_t numberToReturn) { +ACTOR Future addDocumentsFromCursor(Reference cursor, + Reference reply, + Namespace ns, + int32_t numberToReturn, + AddDocsFromCursorCaller from) { state int32_t returned = 0; state int32_t returnedSize = 0; state bool stop = false; + state bool cursorKilled = false; state int32_t remaining = std::abs(numberToReturn); + state bson::BSONArrayBuilder repliedDocs; + while (!numberToReturn || remaining) { try { if ((returned <= DOCLAYER_KNOBS->MAX_RETURNABLE_DOCUMENTS || @@ -353,12 +352,19 @@ ACTOR static Future addDocumentsFromCursor(Reference cursor, // that doc is wrapping a BsonContext, which means toDataValue() is synchronous. bson::BSONObj obj = doc->toDataValue().get().getPackedObject().getOwned(); cursor->checkpoint->getDocumentFinishedLock()->release(); - reply->addDocument(obj); + if (from == AddDocsFromCursorCaller::QUERY) { + reply->addDocument(obj); + } else { + repliedDocs << obj; + } remaining--; returned++; - cursor->returned++; returnedSize += obj.objsize(); + cursor->returned++; + if (cursor->returned == cursor->limit) { + throw end_of_stream(); + } } else { throw success(); } @@ -371,16 +377,40 @@ ACTOR static Future addDocumentsFromCursor(Reference cursor, stop = false; break; } - TraceEvent(SevError, "BD_runQuery2").error(e); + TraceEvent(SevError, "BD_addDocumentsFromCursor").error(e); throw; } } // Reply with cursorID if requested or remove the cursor - if (numberToReturn >= 0 && !stop) + if (numberToReturn >= 0 && !stop) { reply->replyHeader.cursorID = cursor->id; - else + } else { + cursorKilled = true; Cursor::pluck(cursor); + } + + if (from == AddDocsFromCursorCaller::FIND_CMD) { + reply->addDocument( + // clang-format off + BSON("ok" << 1 << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_FIELD << BSON( + DocLayerConstants::FIND_CMD_REPLY_CURSOR_ID_FIELD << (long long)(cursorKilled ? 0 : cursor->id) << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_NS_FIELD << ns.first + "." + ns.second << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_FIRST_BATCH_FIELD << repliedDocs.arr() + ))); + // clang-format on + } else if (from == AddDocsFromCursorCaller::GET_MORE_CMD) { + reply->addDocument( + // clang-format off + BSON("ok" << 1 << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_FIELD << BSON( + DocLayerConstants::FIND_CMD_REPLY_CURSOR_ID_FIELD << (long long)(cursorKilled ? 0 : cursor->id) << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_NS_FIELD << ns.first + "." + ns.second << + DocLayerConstants::FIND_CMD_REPLY_CURSOR_NEXT_BATCH_FIELD << repliedDocs.arr() + ))); + // clang-format on + } return returned; } @@ -461,7 +491,7 @@ ACTOR static Future runQuery(Reference ec, if (toReturn == 1) toReturn = -1; - int32_t returned = wait(addDocumentsFromCursor(cursor, reply, toReturn)); + int32_t returned = wait(addDocumentsFromCursor(cursor, reply, msg->ns, toReturn)); reply->addResponseFlag(8 /*0b1000*/); // If no replies sent yet OR results were placed in reply, send them. @@ -1176,7 +1206,7 @@ ACTOR static Future doGetMoreRun(Reference getMore, Referen if (cursor) { try { - int32_t returned = wait(addDocumentsFromCursor(cursor, reply, getMore->numberToReturn)); + int32_t returned = wait(addDocumentsFromCursor(cursor, reply, getMore->ns, getMore->numberToReturn)); reply->replyHeader.startingFrom = cursor->returned - returned; reply->addResponseFlag(8 /*0b1000*/); cursor->refresh(); @@ -1191,6 +1221,7 @@ ACTOR static Future doGetMoreRun(Reference getMore, Referen return Void(); } +// OP_GET_MORE is deprecated after 3.2 in favor of the new command 'getMore' Future ExtMsgGetMore::run(Reference ec) { return doGetMoreRun(Reference::addRef(this), ec); } diff --git a/src/ExtMsg.actor.h b/src/ExtMsg.actor.h index 87dd6f9..1713bd7 100644 --- a/src/ExtMsg.actor.h +++ b/src/ExtMsg.actor.h @@ -255,6 +255,13 @@ ACTOR Future doUpdateCmd(Namespace ns, std::vector* updateCmds, Reference ec); +enum class AddDocsFromCursorCaller { QUERY, FIND_CMD, GET_MORE_CMD }; +ACTOR Future addDocumentsFromCursor(Reference cursor, + Reference reply, + Namespace ns, + int32_t numberToReturn, + AddDocsFromCursorCaller from = AddDocsFromCursorCaller::QUERY); + // FIXME: these don't really belong here either Reference operatorUpdate(bson::BSONObj const& msgUpdate); Reference replaceUpdate(bson::BSONObj const& replaceWith); diff --git a/src/ExtUtil.actor.cpp b/src/ExtUtil.actor.cpp index cc45e41..b7174aa 100644 --- a/src/ExtUtil.actor.cpp +++ b/src/ExtUtil.actor.cpp @@ -829,93 +829,3 @@ ACTOR Future> getIndexesTransactionally(Reference parseProjection(bson::BSONObj const& fieldSelector) { - std::set parsedFields; - bool includeID = true; - bool hasIncludeValue = false; - - Reference root(new Projection()); - - for (auto itr = fieldSelector.begin(); itr.more(); ++itr) { - bson::BSONElement el = *itr; - std::string fieldName = el.fieldName(); - if (fieldName.length() >= 2 && fieldName.compare(fieldName.length() - 2, 2, ".$") == 0) { - // TODO: $ operator - throw invalid_projection(); - } else if (el.type() == bson::Object) { - // TODO: $slice operator - // TODO: $elemMatch operator - // TODO: $meta operator - throw invalid_projection(); - } else { - bool elIncluded = el.trueValue(); - if (fieldName == DocLayerConstants::ID_FIELD) { - // Specifying _id:1 makes the projection an inclusive projection - if (elIncluded) { - if (hasIncludeValue && root->included) { - throw invalid_projection(); - } - - root->included = false; // Our specification is inclusive, so unspecified fields should be excluded - hasIncludeValue = true; - } - - includeID = elIncluded; - } else { - if (!hasIncludeValue) { - root->included = - !elIncluded; // If el was inclusive, then we should exclude unspecified fields (and vice versa) - hasIncludeValue = true; - } - - // inclusive and exclusive modes cannot be mixed - else if (root->included == elIncluded) { - throw invalid_projection(); - } - - // The whole _id field is either included or excluded as a unit regardless of whether its sub-fields are - // included in the projection - if (fieldName.size() >= 4 && fieldName.compare(0, 4, "_id.") == 0) { - continue; - } - - // Split the field into its consituent parts and insert the parts into the Projection tree - if (parsedFields.insert(fieldName).second) { - size_t pos = -1; - size_t prev; - Reference* current = &root; - do { - prev = pos + 1; - pos = fieldName.find('.', prev); - current = &(*current)->fields[fieldName.substr(prev, std::max(pos - prev, (size_t)1))]; - if (!*current) { - *current = Reference(new Projection()); - } - - // The last field in the fieldName spec should be included iff the element was inclusive. All - // others should be the opposite. - if (pos == std::string::npos) { - (*current)->included = elIncluded; - } else { - (*current)->included = !elIncluded; - } - - } while (pos != prev && pos < fieldName.length() - 1); - } - } - } - } - - if (!hasIncludeValue) { - root->included = true; - } - - if (includeID != root->included) { - root->fields[DocLayerConstants::ID_FIELD] = Reference(new Projection(includeID)); - } - - Projection::filterUnneededReads(root); - return root; -} diff --git a/src/QLPlan.actor.cpp b/src/QLPlan.actor.cpp index 6f66347..9f87c53 100644 --- a/src/QLPlan.actor.cpp +++ b/src/QLPlan.actor.cpp @@ -1504,7 +1504,7 @@ ACTOR static Future doSort(PlanCheckpoint* outerCheckpoint, if (e.code() == error_code_end_of_stream) { break; } - TraceEvent(SevError, "BD_runQuery2").error(e); + TraceEvent(SevError, "BD_doSort_collecting").error(e); throw; } } @@ -1523,7 +1523,7 @@ ACTOR static Future doSort(PlanCheckpoint* outerCheckpoint, ref(new BsonContext(returnProjections[i].getObjectField("doc").getOwned(), false)), -1, Key()))); } } catch (Error& e) { - TraceEvent(SevError, "BD_runQuery2").error(e); + TraceEvent(SevError, "BD_doSort_returning").error(e); throw; } innerCheckpoint->stop(); @@ -1957,3 +1957,9 @@ std::string PlanCheckpoint::toString() { } return s; } + +Reference planProjection(Reference plan, + bson::BSONObj const& selector, + Optional const& ordering) { + return Reference(new ProjectionPlan(parseProjection(selector), plan, ordering)); +} diff --git a/src/QLPlan.actor.h b/src/QLPlan.actor.h index 6b522ab..8cbdf49 100644 --- a/src/QLPlan.actor.h +++ b/src/QLPlan.actor.h @@ -707,5 +707,8 @@ ACTOR Future>> executeUntilCom Reference deletePlan(Reference subPlan, Reference cx, int64_t limit); Reference flushChanges(Reference subPlan); +Reference planProjection(Reference plan, + bson::BSONObj const& selector, + Optional const& ordering); #endif /* _QL_PLAN_ACTOR_H_ */ \ No newline at end of file diff --git a/src/QLProjection.actor.cpp b/src/QLProjection.actor.cpp index 90dfec6..cb5a5ef 100644 --- a/src/QLProjection.actor.cpp +++ b/src/QLProjection.actor.cpp @@ -19,6 +19,7 @@ */ #include "QLProjection.actor.h" +#include "DocumentError.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR Future projectDocument_impl(Reference doc, Reference projection) { @@ -308,4 +309,94 @@ bson::BSONObj BOBObj::build() { } else { return bob->obj(); } -} \ No newline at end of file +} + +// Parse a Projection tree from a BSON projection specification +Reference parseProjection(bson::BSONObj const& fieldSelector) { + std::set parsedFields; + bool includeID = true; + bool hasIncludeValue = false; + + Reference root(new Projection()); + + for (auto itr = fieldSelector.begin(); itr.more(); ++itr) { + bson::BSONElement el = *itr; + std::string fieldName = el.fieldName(); + if (fieldName.length() >= 2 && fieldName.compare(fieldName.length() - 2, 2, ".$") == 0) { + // TODO: $ operator + throw invalid_projection(); + } else if (el.type() == bson::Object) { + // TODO: $slice operator + // TODO: $elemMatch operator + // TODO: $meta operator + throw invalid_projection(); + } else { + bool elIncluded = el.trueValue(); + if (fieldName == DocLayerConstants::ID_FIELD) { + // Specifying _id:1 makes the projection an inclusive projection + if (elIncluded) { + if (hasIncludeValue && root->included) { + throw invalid_projection(); + } + + root->included = false; // Our specification is inclusive, so unspecified fields should be excluded + hasIncludeValue = true; + } + + includeID = elIncluded; + } else { + if (!hasIncludeValue) { + root->included = + !elIncluded; // If el was inclusive, then we should exclude unspecified fields (and vice versa) + hasIncludeValue = true; + } + + // inclusive and exclusive modes cannot be mixed + else if (root->included == elIncluded) { + throw invalid_projection(); + } + + // The whole _id field is either included or excluded as a unit regardless of whether its sub-fields are + // included in the projection + if (fieldName.size() >= 4 && fieldName.compare(0, 4, "_id.") == 0) { + continue; + } + + // Split the field into its consituent parts and insert the parts into the Projection tree + if (parsedFields.insert(fieldName).second) { + size_t pos = -1; + size_t prev; + Reference* current = &root; + do { + prev = pos + 1; + pos = fieldName.find(".", prev); + current = &(*current)->fields[fieldName.substr(prev, std::max(pos - prev, (size_t)1))]; + if (!*current) { + *current = Reference(new Projection()); + } + + // The last field in the fieldName spec should be included iff the element was inclusive. All + // others should be the opposite. + if (pos == fieldName.npos) { + (*current)->included = elIncluded; + } else { + (*current)->included = !elIncluded; + } + + } while (pos != prev && pos < fieldName.length() - 1); + } + } + } + } + + if (!hasIncludeValue) { + root->included = true; + } + + if (includeID != root->included) { + root->fields[DocLayerConstants::ID_FIELD] = Reference(new Projection(includeID)); + } + + Projection::filterUnneededReads(root); + return root; +}