- 
                Notifications
    You must be signed in to change notification settings 
- Fork 576
feat(tdigest): Implement TDIGEST.CDF command #3163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
feat(tdigest): Implement TDIGEST.CDF command #3163
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements the TDIGEST.CDF command to calculate cumulative distribution function values for given inputs in a T-Digest data structure. The implementation follows Redis documentation and adds the necessary command parsing, execution logic, and result handling.
- Adds new CDF method to TDigest class that computes cumulative distribution function values
- Implements CommandTDigestCDF command class with proper parsing and execution
- Registers the new tdigest.cdf command with appropriate attributes
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description | 
|---|---|
| src/types/redis_tdigest.h | Adds TDigestCDFResult struct and CDF method declaration | 
| src/types/redis_tdigest.cc | Implements CDF calculation logic using centroids | 
| src/commands/cmd_tdigest.cc | Adds CommandTDigestCDF class and registers the new command | 
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| double eq_count = 0; | ||
| double smaller_count = 0; | ||
| for (; iter_begin->Valid(); iter_begin->Next()) { | ||
| auto current_centroid = iter_begin->GetCentroid(); | ||
| if (val > current_centroid->mean) { | ||
| smaller_count++; | ||
| } else if (val == current_centroid->mean) { | ||
| eq_count++; | ||
| } | ||
| } | ||
| double cdf_val = (smaller_count / total_weight) + ((eq_count / 2) / total_weight); | 
    
      
    
      Copilot
AI
    
    
    
      Sep 9, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CDF calculation is incorrect. It's counting the number of centroids rather than their weights. It should accumulate current_centroid->weight instead of incrementing by 1 for proper cumulative distribution calculation.
| double eq_count = 0; | |
| double smaller_count = 0; | |
| for (; iter_begin->Valid(); iter_begin->Next()) { | |
| auto current_centroid = iter_begin->GetCentroid(); | |
| if (val > current_centroid->mean) { | |
| smaller_count++; | |
| } else if (val == current_centroid->mean) { | |
| eq_count++; | |
| } | |
| } | |
| double cdf_val = (smaller_count / total_weight) + ((eq_count / 2) / total_weight); | |
| double eq_weight = 0; | |
| double smaller_weight = 0; | |
| for (; iter_begin->Valid(); iter_begin->Next()) { | |
| auto current_centroid = iter_begin->GetCentroid(); | |
| if (val > current_centroid->mean) { | |
| smaller_weight += current_centroid->weight; | |
| } else if (val == current_centroid->mean) { | |
| eq_weight += current_centroid->weight; | |
| } | |
| } | |
| double cdf_val = (smaller_weight / total_weight) + ((eq_weight / 2) / total_weight); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
It seems that we mistake the count with weight here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I will take a look into it.
        
          
                src/commands/cmd_tdigest.cc
              
                Outdated
          
        
      | MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1), | ||
| MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange)); | ||
| MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange), | ||
| MakeCmdAttr<CommandTDigestCDF>("tdigest.cdf", -4, "write", 1, 1, 1)); | 
    
      
    
      Copilot
AI
    
    
    
      Sep 9, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TDIGEST.CDF command should be marked as 'read-only' instead of 'write' since it only reads data and doesn't modify the T-Digest structure.
| MakeCmdAttr<CommandTDigestCDF>("tdigest.cdf", -4, "write", 1, 1, 1)); | |
| MakeCmdAttr<CommandTDigestCDF>("tdigest.cdf", -4, "read-only", 1, 1, 1)); | 
| class CommandTDigestCDF : public Commander { | ||
| Status Parse(const std::vector<std::string> &args) override { | ||
| key_name_ = args[1]; | ||
| if (args.size() == 2) return {Status::RedisParseErr, errWrongNumOfArguments}; | 
    
      
    
      Copilot
AI
    
    
    
      Sep 9, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The command registration specifies minimum 4 arguments (-4), but this validation only checks for exactly 2 arguments. It should validate that there are at least 3 arguments (command + key + at least one value).
| if (args.size() == 2) return {Status::RedisParseErr, errWrongNumOfArguments}; | |
| if (args.size() < 3) return {Status::RedisParseErr, errWrongNumOfArguments}; | 
| Hi @SharonIV0x86 , Thanks for your effort! Best Regards, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
Thanks very much for your effort! 😊
Left some comments and we needto add unit tests and integration tests for this command.
Best Regards,
Edward
| class CommandTDigestCDF : public Commander { | ||
| Status Parse(const std::vector<std::string> &args) override { | ||
| key_name_ = args[1]; | ||
| if (args.size() == 2) return {Status::RedisParseErr, errWrongNumOfArguments}; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could check the vector size at beginning.
| } | ||
| return {Status::RedisExecErr, meta_status.ToString()}; | ||
| } | ||
| if (metadata.total_observations == 0) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested with Redis Docker, it should be the ["nan"] vector with the same size as the input.
        
          
                src/commands/cmd_tdigest.cc
              
                Outdated
          
        
      | return {Status::RedisExecErr, s.ToString()}; | ||
| } | ||
| for (const auto &val : result.cdf_values) { | ||
| cdf_result.push_back(std::to_string(val)); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use util::Float2String as the quantile command.
        
          
                src/commands/cmd_tdigest.cc
              
                Outdated
          
        
      | MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1), | ||
| MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange)); | ||
| MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange), | ||
| MakeCmdAttr<CommandTDigestCDF>("tdigest.cdf", -4, "write", 1, 1, 1)); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a read-only command, but with a lock guard in merge action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i will work on it, i was unsure about how the command registration works and what exactly those parameters do in the MakeCmdAttr so it will be helpful if you could explain it in a little detail.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can refer to PR #2620 for further understanding of the MakeCmdAttr flags.
kvrocks/src/server/redis_connection.cc
Lines 528 to 546 in cf28ab1
| SetLastCmd(cmd_name); | |
| { | |
| std::optional<MultiLockGuard> guard; | |
| if (cmd_flags & kCmdWrite) { | |
| std::vector<std::string> lock_keys; | |
| attributes->ForEachKeyRange( | |
| [&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) { | |
| key_range.ForEachKey( | |
| [&, this](const std::string &key) { | |
| auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded()); | |
| lock_keys.emplace_back(std::move(ns_key)); | |
| }, | |
| args); | |
| }, | |
| cmd_tokens); | |
| guard.emplace(srv_->storage->GetLockManager(), lock_keys); | |
| } | |
| engine::Context ctx(srv_->storage); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made this change.
| 
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| require.Len(t, vals, 1) | ||
| require.Equal(t, "nan", vals[0]) | ||
|  | ||
| // testing with a empry digest with multi-valued CDF | 
    
      
    
      Copilot
AI
    
    
    
      Oct 6, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'empry' to 'empty'.
| // testing with a empry digest with multi-valued CDF | |
| // testing with an empty digest with multi-valued CDF | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
Thanks very much for your effort! 😊
Leave comments, and we'd better also check the comments from Copilot,
Best Regards,
Edward
| TDigestCDFResult result; | ||
| TDigestMetadata metadata; | ||
| auto meta_status = tdigest.GetMetaData(ctx, key_name_, &metadata); | ||
| std::vector<std::string> nan_results(values_.size(), "nan"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nan_results could be constructed in the empty element branch.
| TDigestCDFResult result; | ||
| TDigestMetadata metadata; | ||
| auto meta_status = tdigest.GetMetaData(ctx, key_name_, &metadata); | ||
| std::vector<std::string> nan_results(values_.size(), "nan"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
We could move this nan_results construction into the empty element branch.
| double eq_count = 0; | ||
| double smaller_count = 0; | ||
| for (; iter_begin->Valid(); iter_begin->Next()) { | ||
| auto current_centroid = iter_begin->GetCentroid(); | ||
| if (val > current_centroid->mean) { | ||
| smaller_count++; | ||
| } else if (val == current_centroid->mean) { | ||
| eq_count++; | ||
| } | ||
| } | ||
| double cdf_val = (smaller_count / total_weight) + ((eq_count / 2) / total_weight); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
It seems that we mistake the count with weight here.
| // create a tdigest and add some data | ||
| tdigestKey := keyPrefix + "source" | ||
| require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", tdigestKey).Err()) | ||
| require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", tdigestKey, "1.0", "2.0", "3.0", "4.0", "5.0").Err()) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @SharonIV0x86 ,
We'd better add some tests with duplicated values to create different weights for some centroids.


This closes: #2807
This PR implements the support for
TDIGEST.CDFcommand redis docs. Following the implementation of theQuantilefunction for accessing tdigest centroids.Will add the unit tests once the current logic of the distribution function is correct and approved.