@@ -53,24 +53,24 @@ LevelDbDocumentOverlayCache::LevelDbDocumentOverlayCache(
5353}
5454
5555absl::optional<Overlay> LevelDbDocumentOverlayCache::GetOverlay (
56- const DocumentKey& key ) const {
57- const std::string leveldb_key_prefix =
58- LevelDbDocumentOverlayKey::KeyPrefix (user_id_, key );
56+ const DocumentKey& document_key ) const {
57+ const std::string key_prefix =
58+ LevelDbDocumentOverlayKey::KeyPrefix (user_id_, document_key );
5959
6060 auto it = db_->current_transaction ()->NewIterator ();
61- it->Seek (leveldb_key_prefix );
61+ it->Seek (key_prefix );
6262
63- if (!(it->Valid () && absl::StartsWith (it->key (), leveldb_key_prefix ))) {
63+ if (!(it->Valid () && absl::StartsWith (it->key (), key_prefix ))) {
6464 return absl::nullopt ;
6565 }
6666
67- LevelDbDocumentOverlayKey decoded_key ;
68- HARD_ASSERT (decoded_key .Decode (it->key ()));
69- if (decoded_key .document_key () != key ) {
67+ LevelDbDocumentOverlayKey key ;
68+ HARD_ASSERT (key .Decode (it->key ()));
69+ if (key .document_key () != document_key ) {
7070 return absl::nullopt ;
7171 }
7272
73- return ParseOverlay (decoded_key , it->value ());
73+ return ParseOverlay (key , it->value ());
7474}
7575
7676void LevelDbDocumentOverlayCache::SaveOverlays (
@@ -81,16 +81,8 @@ void LevelDbDocumentOverlayCache::SaveOverlays(
8181}
8282
8383void LevelDbDocumentOverlayCache::RemoveOverlaysForBatchId (int batch_id) {
84- // TODO(dconeybe) Implement an index so that this query can be performed
85- // without requiring a full table scan.
86-
87- ForEachOverlay ([&](absl::string_view encoded_key,
88- const LevelDbDocumentOverlayKey& decoded_key,
89- absl::string_view) {
90- if (decoded_key.largest_batch_id () == batch_id) {
91- db_->current_transaction ()->Delete (encoded_key);
92- }
93- });
84+ ForEachKeyWithLargestBatchId (
85+ batch_id, [&](LevelDbDocumentOverlayKey&& key) { DeleteOverlay (key); });
9486}
9587
9688DocumentOverlayCache::OverlayByDocumentKeyMap
@@ -103,20 +95,18 @@ LevelDbDocumentOverlayCache::GetOverlays(const ResourcePath& collection,
10395
10496 const size_t immediate_children_path_length{collection.size () + 1 };
10597
106- ForEachOverlay ([&](absl::string_view,
107- const LevelDbDocumentOverlayKey& decoded_key,
98+ ForEachOverlay ([&](LevelDbDocumentOverlayKey&& key,
10899 absl::string_view encoded_mutation) {
109- const DocumentKey key = decoded_key.document_key ();
110- if (!collection.IsPrefixOf (key.path ())) {
100+ if (!collection.IsPrefixOf (key.document_key ().path ())) {
111101 return ;
112102 }
113103 // Documents from sub-collections
114- if (key.path ().size () != immediate_children_path_length) {
104+ if (key.document_key (). path ().size () != immediate_children_path_length) {
115105 return ;
116106 }
117107
118- if (decoded_key .largest_batch_id () > since_batch_id) {
119- result[key] = ParseOverlay (decoded_key , encoded_mutation);
108+ if (key .largest_batch_id () > since_batch_id) {
109+ result[key. document_key () ] = ParseOverlay (key , encoded_mutation);
120110 }
121111 });
122112
@@ -135,17 +125,16 @@ LevelDbDocumentOverlayCache::GetOverlays(const std::string& collection_group,
135125 // by largest_batch_id, the loop below can iterate over it ordered by
136126 // largest_batch_id.
137127 std::map<int , std::unordered_set<Overlay, OverlayHash>> overlays_by_batch_id;
138- ForEachOverlay ([&](absl::string_view,
139- const LevelDbDocumentOverlayKey& decoded_key,
140- absl::string_view encoded_mutation) {
141- if (decoded_key.largest_batch_id () <= since_batch_id) {
142- return ;
143- }
144- if (decoded_key.document_key ().HasCollectionId (collection_group)) {
145- overlays_by_batch_id[decoded_key.largest_batch_id ()].emplace (
146- ParseOverlay (decoded_key, encoded_mutation));
147- }
148- });
128+ ForEachOverlay (
129+ [&](LevelDbDocumentOverlayKey&& key, absl::string_view encoded_mutation) {
130+ if (key.largest_batch_id () <= since_batch_id) {
131+ return ;
132+ }
133+ if (key.document_key ().HasCollectionId (collection_group)) {
134+ overlays_by_batch_id[key.largest_batch_id ()].emplace (
135+ ParseOverlay (key, encoded_mutation));
136+ }
137+ });
149138
150139 // Trim down the overlays loaded above to respect the given `count`, and
151140 // return them.
@@ -157,8 +146,8 @@ LevelDbDocumentOverlayCache::GetOverlays(const std::string& collection_group,
157146 OverlayByDocumentKeyMap result;
158147 for (auto & overlays_by_batch_id_entry : overlays_by_batch_id) {
159148 for (auto & overlay : overlays_by_batch_id_entry.second ) {
160- DocumentKey key = overlay.key ();
161- result[key ] = std::move (overlay);
149+ DocumentKey document_key = overlay.key ();
150+ result[document_key ] = std::move (overlay);
162151 }
163152 if (result.size () >= count) {
164153 break ;
@@ -168,6 +157,27 @@ LevelDbDocumentOverlayCache::GetOverlays(const std::string& collection_group,
168157 return result;
169158}
170159
160+ int LevelDbDocumentOverlayCache::GetOverlayCount () const {
161+ return CountEntriesWithKeyPrefix (
162+ LevelDbDocumentOverlayKey::KeyPrefix (user_id_));
163+ }
164+
165+ int LevelDbDocumentOverlayCache::GetLargestBatchIdIndexEntryCount () const {
166+ return CountEntriesWithKeyPrefix (
167+ LevelDbDocumentOverlayLargestBatchIdIndexKey::KeyPrefix (user_id_));
168+ }
169+
170+ int LevelDbDocumentOverlayCache::CountEntriesWithKeyPrefix (
171+ const std::string& key_prefix) const {
172+ int count = 0 ;
173+ auto it = db_->current_transaction ()->NewIterator ();
174+ for (it->Seek (key_prefix);
175+ it->Valid () && absl::StartsWith (it->key (), key_prefix); it->Next ()) {
176+ ++count;
177+ }
178+ return count;
179+ }
180+
171181Overlay LevelDbDocumentOverlayCache::ParseOverlay (
172182 const LevelDbDocumentOverlayKey& key,
173183 absl::string_view encoded_mutation) const {
@@ -181,42 +191,70 @@ Overlay LevelDbDocumentOverlayCache::ParseOverlay(
181191}
182192
183193void LevelDbDocumentOverlayCache::SaveOverlay (int largest_batch_id,
184- const DocumentKey& key ,
194+ const DocumentKey& document_key ,
185195 const Mutation& mutation) {
186- DeleteOverlay (key);
187- const std::string leveldb_key =
188- LevelDbDocumentOverlayKey::Key (user_id_, key, largest_batch_id);
189- auto serialized_mutation = serializer_->EncodeMutation (mutation);
190- db_->current_transaction ()->Put (leveldb_key, serialized_mutation);
196+ // Remove the existing overlay and any index entries pointing to it.
197+ DeleteOverlay (document_key);
198+
199+ const LevelDbDocumentOverlayKey key (user_id_, document_key, largest_batch_id);
200+
201+ // Add the overlay to the database and index entries pointing to it.
202+ auto * transaction = db_->current_transaction ();
203+ transaction->Put (key.Encode (), serializer_->EncodeMutation (mutation));
204+ transaction->Put (LevelDbDocumentOverlayLargestBatchIdIndexKey::Key (key), " " );
191205}
192206
193- void LevelDbDocumentOverlayCache::DeleteOverlay (const model::DocumentKey& key) {
194- const std::string leveldb_key_prefix =
195- LevelDbDocumentOverlayKey::KeyPrefix (user_id_, key);
207+ void LevelDbDocumentOverlayCache::DeleteOverlay (
208+ const model::DocumentKey& document_key) {
209+ const std::string key_prefix =
210+ LevelDbDocumentOverlayKey::KeyPrefix (user_id_, document_key);
196211 auto it = db_->current_transaction ()->NewIterator ();
197- for (it->Seek (leveldb_key_prefix);
198- it->Valid () && absl::StartsWith (it->key (), leveldb_key_prefix);
199- it->Next ()) {
200- LevelDbDocumentOverlayKey decoded_key;
201- HARD_ASSERT (decoded_key.Decode (it->key ()));
202- if (decoded_key.document_key () == key) {
203- db_->current_transaction ()->Delete (it->key ());
204- }
212+ it->Seek (key_prefix);
213+
214+ if (!(it->Valid () && absl::StartsWith (it->key (), key_prefix))) {
215+ return ;
205216 }
217+
218+ LevelDbDocumentOverlayKey key;
219+ HARD_ASSERT (key.Decode (it->key ()));
220+ if (key.document_key () == document_key) {
221+ DeleteOverlay (key);
222+ }
223+ }
224+
225+ void LevelDbDocumentOverlayCache::DeleteOverlay (
226+ const LevelDbDocumentOverlayKey& key) {
227+ auto * transaction = db_->current_transaction ();
228+ transaction->Delete (key.Encode ());
229+ transaction->Delete (LevelDbDocumentOverlayLargestBatchIdIndexKey::Key (key));
206230}
207231
208232void LevelDbDocumentOverlayCache::ForEachOverlay (
209- std::function<void (absl::string_view encoded_key,
210- const LevelDbDocumentOverlayKey& decoded_key,
211- absl::string_view encoded_mutation)> callback) const {
233+ std::function<void ((LevelDbDocumentOverlayKey && key,
234+ absl::string_view encoded_mutation))> callback) const {
212235 auto it = db_->current_transaction ()->NewIterator ();
213236 const std::string user_key = LevelDbDocumentOverlayKey::KeyPrefix (user_id_);
214237
215238 for (it->Seek (user_key); it->Valid () && absl::StartsWith (it->key (), user_key);
216239 it->Next ()) {
217- LevelDbDocumentOverlayKey decoded_key;
218- HARD_ASSERT (decoded_key.Decode (it->key ()));
219- callback (it->key (), decoded_key, it->value ());
240+ LevelDbDocumentOverlayKey key;
241+ HARD_ASSERT (key.Decode (it->key ()));
242+ callback (std::move (key), it->value ());
243+ }
244+ }
245+
246+ void LevelDbDocumentOverlayCache::ForEachKeyWithLargestBatchId (
247+ int largest_batch_id,
248+ std::function<void (LevelDbDocumentOverlayKey&& key)> callback) const {
249+ const std::string key_prefix =
250+ LevelDbDocumentOverlayLargestBatchIdIndexKey::KeyPrefix (user_id_,
251+ largest_batch_id);
252+ auto it = db_->current_transaction ()->NewIterator ();
253+ for (it->Seek (key_prefix);
254+ it->Valid () && absl::StartsWith (it->key (), key_prefix); it->Next ()) {
255+ LevelDbDocumentOverlayLargestBatchIdIndexKey key;
256+ HARD_ASSERT (key.Decode (it->key ()));
257+ callback (std::move (key).ToLevelDbDocumentOverlayKey ());
220258 }
221259}
222260
0 commit comments