Skip to content

Conversation

@baswanth09
Copy link
Collaborator

@baswanth09 baswanth09 commented Oct 23, 2025

Track In-Flight Keys

When we schedule a mutation task for a key on the threadpool, we track mutation_queue_size_ but not which keys are actually in the queue. We'll need to know the in-flight keys so that when we evaluate predicates on the main thread at the end of the FT.SEARCH flow, we need to know if all relevant keys have been indexed or not. These checks will be added in followup PRs.

Copy link
Member

@allenss-amazon allenss-amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR assumes that an individual key can only be in the ingestion pipeline once. That's not true. It's quite possible to be currently ingesting a key (i.e., inside of SyncProcessMutation) and have that same key get scheduled again for mutation. In that scenario, this code will fail. I believe that you'll need to keep a count of in-flight entries for each key.

@BCathcart
Copy link
Collaborator

I wasn't aware of tracked_mutated_records_ which at first glance could indicate we don't have multiple in-flight mutations for the same key and already have a way to check what keys are in-flight. In the common case, if the key is already in tracked_mutated_records_, then that entry is simply updated. The key is only removed from the tracked_mutated_records_ once it has been fully indexed here. However, multi-exec handling looks to complicate things so I'd look into how that multi queue works @baswanth09.

@baswanth09
Copy link
Collaborator Author

Yes. InternedStringMap<DocumentMutation> tracked_mutated_records_ can be used as source of truth to check if the key is in-flight.
In IndexSchema::TrackMutatedRecord if the key is not new we simply update with the latest values.

for (auto &mutated_attribute : mutated_attributes) {
    itr->second.attributes.value()[mutated_attribute.first] =
        std::move(mutated_attribute.second);
  }

For multi-exec as well, keys are added to tracked_mutated_records_

HSET inside MULTI/EXEC
 → ProcessMutation (inside_multi_exec=true)
   → EnqueueMultiMutation
     → multi_mutations.keys.push(key)
     → IF queue.size >= thread_pool_size:
         → vmsdk::RunByMain → ProcessMultiQueue (early flush)
   → TrackMutatedRecord (adds to tracked_mutated_records_)
   → return

So, I have just added a utility function to check if a key is in-flight or not using tracked_mutated_records_

@yairgott
Copy link
Collaborator

We'll need to know the in-flight keys so that when we evaluate predicates on the main thread at the end of the FT.SEARCH flow, we need to know if all relevant keys have been indexed or not.

Today, on a keyspace notification, the client is blocked so any prior mutations, by the same client, were already ingested at query time. Can you elaborate why do we need to know if mutations from other clients on the relevant keys needed to be processed?

@baswanth09 baswanth09 requested a review from BCathcart October 29, 2025 18:17
@baswanth09
Copy link
Collaborator Author

baswanth09 commented Oct 29, 2025

Today, on a keyspace notification, the client is blocked so any prior mutations, by the same client, were already ingested at query time. Can you elaborate why do we need to know if mutations from other clients on the relevant keys needed to be processed?

Basically we want to block the ft.search response on conflicting In-Flight keys to guarantee correctness and point in time consistency.
In the current code, once the keys of the ft.search result set are found, we fetch those keys data from the keyspace to return to the customer. The search to find the keys is performed by background threads on the indexes, which don't reflect the current state of the keyspace if there are in-flight mutations. So at the end of a search, we come back to the main thread and this path is hit: ProcessNeighborsForReply -> GetContent -> VerifyFilter. GetContent fetches the key data from the keyspace and then VerifyFilter is called to ensure that its current value still satisfies the search criteria (i.e. predicate). It's easy to evaluate the fetched data directly for Numeric and Tag, but for Text it gets expensive.

Instead of ingesting the key's text data into a temporary text index, evaluating the predicate against it, and then throwing the index away, we can keep the client blocked and wait for the natural background ingestion to continue. This means we need to compare the keys in the response to the in-flight keys and keep the client blocked if there is any overlap. Next time ProcessNeighborsForReply is invoked, we re-check the in-flight keys and if there is no conflict this time, we evaluate the predicate against the per-key text indexes that are populated by the background ingestion since they now accurately reflect the current keyspace. The one caveat for this approach is a live lock where there is always a conflict with in-flight keys and the search eventually times out. We decided that we are okay living with this since it should be rare.
cc: @BCathcart

virtual void OnLoadingEnded(ValkeyModuleCtx *ctx);

inline const Stats &GetStats() const { return stats_; }
bool IsKeyInflight(const InternedStringPtr &key) const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add ABSL_LOCK_EXCLUDED(mutated_records_mutex_). Otherwise I'm g2g

@baswanth09
Copy link
Collaborator Author

Added the annotation ABSL_LOCKS_EXCLUDED(mutated_records_mutex_).

@yairgott
Copy link
Collaborator

yairgott commented Nov 5, 2025

Instead of ingesting the key's text data into a temporary text index, evaluating the predicate against it, and then throwing the index away, we can keep the client blocked and wait for the natural background ingestion to continue. This means we need to compare the keys in the response to the in-flight keys and keep the client blocked if there is any overlap. Next time ProcessNeighborsForReply is invoked, we re-check the in-flight keys and if there is no conflict this time, we evaluate the predicate against the per-key text indexes that are populated by the background ingestion since they now accurately reflect the current keyspace. The one caveat for this approach is a live lock where there is always a conflict with in-flight keys and the search eventually times out. We decided that we are okay living with this since it should be rare.

Because the ft.search runs on a worker thread and we don't want to block the main thread while executing queries, in-consistency between the index and the keyspace store is inevitable. In mixed of read/write workloads, there are always inflight mutations in the queue. The only way to eliminate this constraint is to limit the query returned fields to just the indexed fields. Am I missing something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants