Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
03b69e1
feat: impl tdigest.revrank
donghao526 Aug 17, 2025
97df4a1
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
70a39d3
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
dde8410
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
0d3e9cc
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
bb172a8
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
a64add4
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
3954b1f
Merge branch 'unstable' into feature/tdigest-revrank
PragmaTwice Aug 19, 2025
05d1202
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
f688e14
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
8bcad0f
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
46ac984
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
495e072
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
2b6785d
add golang test cases for tdigest.revrank
invalid-email-address Aug 20, 2025
3af3b54
feat: impl tdigest.revrank
invalid-email-address Aug 20, 2025
f3d85d3
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 20, 2025
e68689d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 21, 2025
eb8674f
feat: impl tdigest.revrank
invalid-email-address Aug 27, 2025
c70f410
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 27, 2025
b991d0d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 27, 2025
4c9a41d
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
invalid-email-address Aug 28, 2025
543fda0
fix(replication): Fix Seg Fault On Signal When Replication is Enabled…
zhixinwen Aug 20, 2025
e0d39a7
chore(.asf.yaml): make 2.13 a protected branches (#3129)
PragmaTwice Aug 20, 2025
a4ed14c
feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD`…
yezhizi Aug 20, 2025
9d6c532
feat(ts): Add `TS.INFO` command (#3133)
yezhizi Aug 22, 2025
ff658f8
chore(.asf.yaml): enable auto merge and disable wiki (#3137)
PragmaTwice Aug 22, 2025
53e82f8
chore: remove unused `autoResizeBlockAndSST` method and config (#3136)
jonahgao Aug 22, 2025
6df3309
feat(scripting): support strict key-accessing mode for lua scripting …
PragmaTwice Aug 23, 2025
201afed
feat(Dockerfile): add a UID for the user in the container (#3138)
SpecLad Aug 24, 2025
0851c22
feat(ts): Add data query support and `TS.RANGE` command (#3140)
yezhizi Aug 25, 2025
c7ed36f
feat(ts): Add `TS.GET` command (#3142)
yezhizi Aug 26, 2025
3a898fe
chore(config): enable `level_compaction_dynamic_level_bytes` by defau…
jonahgao Aug 26, 2025
3711578
perf(storage): eliminate unnecessary `rocksdb::DB::ListColumnFamilies…
jonahgao Aug 27, 2025
4b4f684
fix(scan): pattern-based SCAN iterations may skip remaining keys (#3146)
sryanyuan Aug 27, 2025
bd268b4
style: add some comments on TDigestRank
donghao526 Aug 28, 2025
8e6a7f9
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Aug 28, 2025
6662240
refactor: remove commented code
donghao526 Aug 28, 2025
e7f06a2
style: format code
donghao526 Aug 28, 2025
367981c
Merge branch 'unstable' into feature/tdigest-revrank
LindaSummer Aug 29, 2025
4b8cd6a
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Sep 18, 2025
07836fd
feat: sort the input using map in revrank
donghao526 Sep 19, 2025
f44bc56
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Sep 19, 2025
2aded75
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 14, 2025
f4a9c53
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
5023de8
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 25, 2025
e3629d9
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
ae05623
fix: fix format
donghao526 Oct 25, 2025
0cf8c8a
fix: fix clang-tidy
donghao526 Oct 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,49 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

class CommandTDigestRevRank : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
inputs_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
inputs_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(inputs_.size());
if (const auto s = tdigest.RevRank(ctx, key_name_, inputs_, &result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}

if (result.data()) {
std::vector<std::string> rev_ranks;
rev_ranks.reserve(result.size());
for (const auto v : result) {
rev_ranks.push_back(redis::Integer(v));
}
*output = redis::Array(rev_ranks);
} else {
*output = redis::BulkString("nan");
}
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> inputs_;
};

class CommandTDigestMinMax : public Commander {
public:
explicit CommandTDigestMinMax(bool is_min) : is_min_(is_min) {}
Expand Down Expand Up @@ -369,6 +412,7 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRevRank>("tdigest.revrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
Expand Down
61 changes: 61 additions & 0 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,67 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, con
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result->resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}

ctx.RefreshLatestSnapshot();
}
}
Comment on lines 222 to 237
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @donghao526 ,

The merge action could be refactored into a function to reduce duplication of the same logic.

Best Regards,
Edward


std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);

result->clear();
result->reserve(inputs.size());

for (auto value : inputs) {
auto status_or_rank = TDigestRevRank(dump_centroids, value);
if (!status_or_rank) {
return rocksdb::Status::InvalidArgument(status_or_rank.Msg());
}
result->push_back(*status_or_rank);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @donghao526 ,

We could sort the inputs and get the ranks with just one scan of the centroids since it's sorted.

Best Regards,
Edward

Copy link
Author

@donghao526 donghao526 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LindaSummer
I encountered a problem when I was testing. After the nodes merged, are there two adjacent centroids can be with the same mean?

I Test with

TDIGEST.CREATE s COMPRESSION 1000

TDIGEST.ADD s 10 10 10 10 20 20

I found the centroids after merged are:
(1) mean: 10 weight: 1
(2) mean: 10 weight: 1
(3) mean: 10 weight: 1
(4) mean: 10 weight: 1
(5) mean: 20 weight: 1
(6) mean: 20 weight: 1

Is this as expected or a bug?

Copy link
Member

@LindaSummer LindaSummer Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @donghao526 ,

It is expected, and you could refer to #2878 for more details.
So we need a stable way for both serialization and deserialization.

The trigger for the merge is the weight, not the mean. So we could treat the mean only as a label of one centroid. The whole logic is driven by weight.

Best Regards,
Edward

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);

rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>* result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down
19 changes: 19 additions & 0 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,22 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) {
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}

template <typename TD>
inline StatusOr<int> TDigestRevRank(TD&& td, double value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @donghao526 ,

We need to use a stable way to compare between doubles.

It will be tough to assume that the two double numbers are equal to or greater than.

After solving this, we should add some test cases for this corner case.

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @donghao526 ,

Since the other code snippets use this way now. You could leave it with the current logic.

I will try to create a new PR to solve the unstable comparison problem in this file.

Best Regards,
Edward

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, after your new PR, I can help to fix here.

if (value < td.Min()) {
return static_cast<int>(td.TotalWeight());
}
if (value > td.Max()) {
return static_cast<int>(-1);
}
double rank = 0;
for (auto iter = td.Begin(); iter->Valid(); iter->Next()) {
if (auto centroid = GET_OR_RET(iter->GetCentroid()); centroid.mean > value) {
rank += centroid.weight;
} else if (centroid.mean == value) {
rank += centroid.weight / 2;
}
}
return static_cast<int>(rank);
}
76 changes: 76 additions & 0 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,79 @@ TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_different_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 20, 30, 40, 50, 60};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {0, 10, 20, 30, 40, 50, 60, 70};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result = std::vector<double>{6, 5, 4, 3, 2, 1, 0, -1};

for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_several_identical_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 10, 10, 20, 20};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result = std::vector<double>{3, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();

status = tdigest_->Add(*ctx_, test_digest_name, std::vector<double>{10});
ASSERT_TRUE(status.ok()) << status.ToString();

result.clear();
status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result_new = std::vector<double>{4, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result_new[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_empty_tdigest) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<int> result;
result.reserve(2);
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}
Loading