Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3235803
feat: added initial tdigest.cdf logic
SharonIV0x86 Sep 6, 2025
f3bdd77
Merge branch 'apache:unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 6, 2025
9e26d86
fix: fixed typo in inputs and registered tdigest.cdf in MakeCmdAttr
SharonIV0x86 Sep 6, 2025
7366edb
Merge branch 'apache:unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 9, 2025
d3a44cf
Merge branch 'apache:unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 11, 2025
262574f
fix: fixed MakeCmdAttr to read-only
SharonIV0x86 Sep 11, 2025
b64389d
fix: used util::float2String instead of to_string.
SharonIV0x86 Sep 12, 2025
4731484
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 14, 2025
f361fc4
Added gocase unit test case.
SharonIV0x86 Sep 14, 2025
6753870
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 22, 2025
cb86cb3
progress: added basic c++ unit test case for CDF
SharonIV0x86 Sep 22, 2025
fcd7d5a
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 26, 2025
e0451ba
progress: added more c++ test cases for cdf
SharonIV0x86 Sep 26, 2025
1d425a0
fix: fixed go test file formatting, distorted from the prev commit e0…
SharonIV0x86 Sep 26, 2025
31423cd
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Sep 27, 2025
67923f4
fix clang-tidy error
LindaSummer Sep 28, 2025
4d1988b
Merge branch 'unstable' into feat/tdigest.cdf-command
LindaSummer Sep 28, 2025
ec52575
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Oct 4, 2025
d23ad76
fix: go formatting
SharonIV0x86 Oct 5, 2025
cbff56e
Merge branch 'unstable' into feat/tdigest.cdf-command
SharonIV0x86 Oct 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,56 @@ class CommandTDigestMerge : public Commander {
std::vector<std::string> source_keys_;
TDigestMergeOptions options_;
};
class CommandTDigestCDF : public Commander {
Status Parse(const std::vector<std::string> &args) override {
if (args.size() == 2) return {Status::RedisParseErr, errWrongNumOfArguments};
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command registration specifies minimum 4 arguments (-4), but this validation only checks for exactly 2 arguments. It should validate that there are at least 3 arguments (command + key + at least one value).

Suggested change
if (args.size() == 2) return {Status::RedisParseErr, errWrongNumOfArguments};
if (args.size() < 3) return {Status::RedisParseErr, errWrongNumOfArguments};

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could check the vector size at beginning.

key_name_ = args[1];
values_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
values_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<std::string> cdf_result;
TDigestCDFResult result;
TDigestMetadata metadata;
auto meta_status = tdigest.GetMetaData(ctx, key_name_, &metadata);
std::vector<std::string> nan_results(values_.size(), "nan");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nan_results could be constructed in the empty element branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SharonIV0x86 ,

We could move this nan_results construction into the empty element branch.

if (!meta_status.ok()) {
if (meta_status.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
*output = redis::MultiBulkString(RESP::v2, nan_results);
return Status::OK();
}
if (metadata.total_observations == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested with Redis Docker, it should be the ["nan"] vector with the same size as the input.

*output = redis::MultiBulkString(RESP::v2, nan_results);
return Status::OK();
}
auto s = tdigest.CDF(ctx, key_name_, values_, &result);
if (!s.ok()) {
*output = redis::MultiBulkString(RESP::v2, nan_results);
return {Status::RedisExecErr, s.ToString()};
}
if (result.cdf_values) {
for (const auto &val : *result.cdf_values) {
cdf_result.push_back(util::Float2String(val));
}
}
*output = redis::MultiBulkString(RESP::v2, cdf_result);
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> values_;
};
std::vector<CommandKeyRange> GetMergeKeyRange(const std::vector<std::string> &args) {
auto numkeys = ParseInt<int>(args[2], 10).ValueOr(0);
return {{1, 1, 1}, {3, 2 + numkeys, 1}};
Expand All @@ -371,5 +420,6 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange),
MakeCmdAttr<CommandTDigestCDF>("tdigest.cdf", -3, "read-only", 1, 1, 1));
} // namespace redis
63 changes: 63 additions & 0 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,69 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,

return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TDigest::CDF(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
TDigestCDFResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); !status.ok()) {
return status;
}
if (metadata.total_observations == 0) {
return rocksdb::Status::OK();
}

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}
ctx.RefreshLatestSnapshot();
}
}
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}
auto dump_centroids = DummyCentroids(metadata, centroids);
double total_weight = dump_centroids.TotalWeight();
std::vector<double> results;
for (double val : inputs) {
auto iter_begin = dump_centroids.Begin();
auto iter_end = dump_centroids.End();
double eq_count = 0;
double smaller_count = 0;
for (; iter_begin->Valid(); iter_begin->Next()) {
auto current_centroid = iter_begin->GetCentroid();
if (val > current_centroid->mean) {
smaller_count++;
} else if (val == current_centroid->mean) {
eq_count++;
}
}
double cdf_val = (smaller_count / total_weight) + ((eq_count / 2) / total_weight);
Comment on lines +464 to +474
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDF calculation is incorrect. It's counting the number of centroids rather than their weights. It should accumulate current_centroid->weight instead of incrementing by 1 for proper cumulative distribution calculation.

Suggested change
double eq_count = 0;
double smaller_count = 0;
for (; iter_begin->Valid(); iter_begin->Next()) {
auto current_centroid = iter_begin->GetCentroid();
if (val > current_centroid->mean) {
smaller_count++;
} else if (val == current_centroid->mean) {
eq_count++;
}
}
double cdf_val = (smaller_count / total_weight) + ((eq_count / 2) / total_weight);
double eq_weight = 0;
double smaller_weight = 0;
for (; iter_begin->Valid(); iter_begin->Next()) {
auto current_centroid = iter_begin->GetCentroid();
if (val > current_centroid->mean) {
smaller_weight += current_centroid->weight;
} else if (val == current_centroid->mean) {
eq_weight += current_centroid->weight;
}
}
double cdf_val = (smaller_weight / total_weight) + ((eq_weight / 2) / total_weight);

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SharonIV0x86 ,

It seems that we mistake the count with weight here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will take a look into it.

results.push_back(cdf_val);
}
result->cdf_values = results;
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata) {
auto ns_key = AppendNamespacePrefix(digest_name);
Expand Down
6 changes: 5 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ struct TDigestMergeOptions {
uint32_t compression = 0;
bool override_flag = false;
};

struct TDigestCDFResult {
std::optional<std::vector<double>> cdf_values;
};
struct TDigestQuantitleResult {
std::optional<std::vector<double>> quantiles;
};
Expand Down Expand Up @@ -79,6 +81,8 @@ class TDigest : public SubKeyScanner {
const TDigestMergeOptions& options);

rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);
rocksdb::Status CDF(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
TDigestCDFResult* result);

private:
enum class SegmentType : uint8_t { kBuffer = 0, kCentroids = 1, kGuardFlag = 0xFF };
Expand Down
135 changes: 134 additions & 1 deletion tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
#include <range/v3/algorithm/shuffle.hpp>
#include <range/v3/range.hpp>
#include <range/v3/view/chunk.hpp>
#include <range/v3/view/concat.hpp>
#include <range/v3/view/iota.hpp>
#include <range/v3/view/join.hpp>
#include <range/v3/view/repeat.hpp>
#include <range/v3/view/transform.hpp>
#include <string>
#include <vector>

#include "logging.h"
#include "storage/redis_metadata.h"
#include "test_base.h"
#include "time_util.h"
Expand Down Expand Up @@ -298,3 +299,135 @@ TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}
TEST_F(RedisTDigestTest, CDF_Test) {
std::string cdf_tdigest_name = "test_cdf_digest" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, cdf_tdigest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> samples = {1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5};
status = tdigest_->Add(*ctx_, cdf_tdigest_name, samples);
ASSERT_TRUE(status.ok());

std::vector<double> cdf_vals = {0, 1, 2, 3, 4, 5, 6};
redis::TDigestCDFResult result;

status = tdigest_->CDF(*ctx_, cdf_tdigest_name, cdf_vals, &result);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<double> expected = {0.00, 0.03, 0.13, 0.29, 0.53, 0.83, 1.00};
ASSERT_TRUE(result.cdf_values) << "CDF should have values";
ASSERT_EQ(result.cdf_values->size(), cdf_vals.size());

for (size_t i = 0; i < cdf_vals.size(); i++) {
EXPECT_NEAR((*result.cdf_values)[i], expected[i], 0.015) << fmt::format("Mismatch at index {}", i);
}
}

TEST_F(RedisTDigestTest, CDF_returns_nan_on_empty_tdigest) {
std::string test_digest_name = "test_digest_cdf_nan" + std::to_string(util::GetTimeStampMS());

bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> values = {0.0, 1.0, 2.0, 3.0};
redis::TDigestCDFResult result;

status = tdigest_->CDF(*ctx_, test_digest_name, values, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.cdf_values);
}

TEST_F(RedisTDigestTest, CDF_uniform_distribution) {
std::string test_digest_name = "test_cdf_uniform" + std::to_string(util::GetTimeStampMS());

bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {200}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> samples = ranges::views::iota(1, 101) |
ranges::views::transform([](int i) { return (double)i; }) |
ranges::to<std::vector<double>>();
status = tdigest_->Add(*ctx_, test_digest_name, samples);
ASSERT_TRUE(status.ok());

std::vector<double> cdf_vals = {1, 25, 50, 75, 100};
redis::TDigestCDFResult result;
status = tdigest_->CDF(*ctx_, test_digest_name, cdf_vals, &result);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<double> expected = {0.01, 0.25, 0.50, 0.75, 1.00};
ASSERT_TRUE(result.cdf_values) << "CDF should have values";
ASSERT_EQ(result.cdf_values->size(), cdf_vals.size());

for (size_t i = 0; i < cdf_vals.size(); i++) {
EXPECT_NEAR((*result.cdf_values)[i], expected[i], 0.02)
<< fmt::format("Mismatch at index {}, val={}", i, cdf_vals[i]);
}
}

TEST_F(RedisTDigestTest, CDF_multiple_adds) {
std::string test_digest_name = "test_cdf_multiadd" + std::to_string(util::GetTimeStampMS());

bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> samples1 = {1, 2, 3, 4, 5};
std::vector<double> samples2 = {6, 7, 8, 9, 10};
status = tdigest_->Add(*ctx_, test_digest_name, samples1);
ASSERT_TRUE(status.ok());
status = tdigest_->Add(*ctx_, test_digest_name, samples2);
ASSERT_TRUE(status.ok());

std::vector<double> cdf_vals = {1, 5, 7, 10};
redis::TDigestCDFResult result;
status = tdigest_->CDF(*ctx_, test_digest_name, cdf_vals, &result);
ASSERT_TRUE(status.ok());

std::vector<double> expected = {0.10, 0.50, 0.70, 1.00};
ASSERT_TRUE(result.cdf_values) << "CDF should have values";
ASSERT_EQ(result.cdf_values->size(), cdf_vals.size());

for (size_t i = 0; i < cdf_vals.size(); i++) {
EXPECT_NEAR((*result.cdf_values)[i], expected[i], 0.05)
<< fmt::format("Mismatch at index {}, val={}", i, cdf_vals[i]);
}
}

TEST_F(RedisTDigestTest, CDF_skewed_distribution) {
std::string test_digest_name = "test_cdf_skewed" + std::to_string(util::GetTimeStampMS());

bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {200}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> samples =
ranges::views::concat(
ranges::views::repeat(0.0) | ranges::views::take(100),
ranges::views::iota(1, 11) | ranges::views::transform([](int i) { return static_cast<double>(i); })) |
ranges::to<std::vector<double>>();

status = tdigest_->Add(*ctx_, test_digest_name, samples);
ASSERT_TRUE(status.ok());

std::vector<double> cdf_vals = {0, 1, 5, 10};
redis::TDigestCDFResult result;
status = tdigest_->CDF(*ctx_, test_digest_name, cdf_vals, &result);
ASSERT_TRUE(status.ok());

std::vector<double> expected = {0.4545, 0.91, 0.95, 1.00};
ASSERT_TRUE(result.cdf_values) << "CDF should have values";
ASSERT_EQ(result.cdf_values->size(), cdf_vals.size());

for (size_t i = 0; i < cdf_vals.size(); i++) {
EXPECT_NEAR((*result.cdf_values)[i], expected[i], 0.03)
<< fmt::format("Mismatch at index {}, val={}", i, cdf_vals[i]);
}
}
52 changes: 52 additions & 0 deletions tests/gocase/unit/type/tdigest/tdigest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
errMsgKeyNotExist = "key does not exist"
errNumkeysMustBePositive = "numkeys need to be a positive integer"
errCompressionParameterMustBePositive = "compression parameter needs to be a positive integer"
errValueIsNotFloat = "value is not a valid float"
)

type tdigestInfo struct {
Expand Down Expand Up @@ -518,4 +519,55 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
validation(newDestKey1)
validation(newDestKey2)
})
t.Run("tdigest.cdf with different arguments", func(t *testing.T) {
keyPrefix := "tdigest_cdf_"

require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CDF").Err(), errMsgWrongNumberArg)
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CDF", keyPrefix+"key1").Err(), errMsgWrongNumberArg)

// non-existent key
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CDF", keyPrefix+"nonexistent", "1.0").Err(), errMsgKeyNotExist)

// invalid float value
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CDF", keyPrefix+"key2", "invalid").Err(), errValueIsNotFloat)

// create a tdigest and add some data
tdigestKey := keyPrefix + "source"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", tdigestKey).Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", tdigestKey, "1.0", "2.0", "3.0", "4.0", "5.0").Err())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SharonIV0x86 ,

We'd better add some tests with duplicated values to create different weights for some centroids.


// single-value CDF query
rsp := rdb.Do(ctx, "TDIGEST.CDF", tdigestKey, "3.0")
require.NoError(t, rsp.Err())
vals, err := rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 1)
require.NotEqual(t, "nan", vals[0])

// multi-value CDF query
rsp = rdb.Do(ctx, "TDIGEST.CDF", tdigestKey, "0.0", "2.5", "5.0", "10.0")
require.NoError(t, rsp.Err())
vals, err = rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 4)

// empty tdigest should return "nan"
emptyKey := keyPrefix + "empty"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", emptyKey).Err())
rsp = rdb.Do(ctx, "TDIGEST.CDF", emptyKey, "1.0")
require.NoError(t, rsp.Err())
vals, err = rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 1)
require.Equal(t, "nan", vals[0])

// testing with a empry digest with multi-valued CDF
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'empry' to 'empty'.

Suggested change
// testing with a empry digest with multi-valued CDF
// testing with an empty digest with multi-valued CDF

Copilot uses AI. Check for mistakes.
rsp = rdb.Do(ctx, "TDIGEST.CDF", emptyKey, "0.5", "1.0", "1.5", "2.2")
require.NoError(t, rsp.Err())
vals, err = rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 4)
require.Equal(t, []interface{}{"nan", "nan", "nan", "nan"}, vals)

})
}