Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03b69e1
feat: impl tdigest.revrank
donghao526 Aug 17, 2025
97df4a1
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
70a39d3
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
dde8410
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
0d3e9cc
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
bb172a8
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
a64add4
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
3954b1f
Merge branch 'unstable' into feature/tdigest-revrank
PragmaTwice Aug 19, 2025
05d1202
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
f688e14
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
8bcad0f
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
46ac984
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
495e072
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
2b6785d
add golang test cases for tdigest.revrank
invalid-email-address Aug 20, 2025
3af3b54
feat: impl tdigest.revrank
invalid-email-address Aug 20, 2025
f3d85d3
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 20, 2025
e68689d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 21, 2025
eb8674f
feat: impl tdigest.revrank
invalid-email-address Aug 27, 2025
c70f410
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 27, 2025
b991d0d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 27, 2025
4c9a41d
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
invalid-email-address Aug 28, 2025
543fda0
fix(replication): Fix Seg Fault On Signal When Replication is Enabled…
zhixinwen Aug 20, 2025
e0d39a7
chore(.asf.yaml): make 2.13 a protected branches (#3129)
PragmaTwice Aug 20, 2025
a4ed14c
feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD`…
yezhizi Aug 20, 2025
9d6c532
feat(ts): Add `TS.INFO` command (#3133)
yezhizi Aug 22, 2025
ff658f8
chore(.asf.yaml): enable auto merge and disable wiki (#3137)
PragmaTwice Aug 22, 2025
53e82f8
chore: remove unused `autoResizeBlockAndSST` method and config (#3136)
jonahgao Aug 22, 2025
6df3309
feat(scripting): support strict key-accessing mode for lua scripting …
PragmaTwice Aug 23, 2025
201afed
feat(Dockerfile): add a UID for the user in the container (#3138)
SpecLad Aug 24, 2025
0851c22
feat(ts): Add data query support and `TS.RANGE` command (#3140)
yezhizi Aug 25, 2025
c7ed36f
feat(ts): Add `TS.GET` command (#3142)
yezhizi Aug 26, 2025
3a898fe
chore(config): enable `level_compaction_dynamic_level_bytes` by defau…
jonahgao Aug 26, 2025
3711578
perf(storage): eliminate unnecessary `rocksdb::DB::ListColumnFamilies…
jonahgao Aug 27, 2025
4b4f684
fix(scan): pattern-based SCAN iterations may skip remaining keys (#3146)
sryanyuan Aug 27, 2025
bd268b4
style: add some comments on TDigestRank
donghao526 Aug 28, 2025
8e6a7f9
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Aug 28, 2025
6662240
refactor: remove commented code
donghao526 Aug 28, 2025
e7f06a2
style: format code
donghao526 Aug 28, 2025
367981c
Merge branch 'unstable' into feature/tdigest-revrank
LindaSummer Aug 29, 2025
4b8cd6a
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Sep 18, 2025
07836fd
feat: sort the input using map in revrank
donghao526 Sep 19, 2025
f44bc56
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Sep 19, 2025
2aded75
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 14, 2025
f4a9c53
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
5023de8
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 25, 2025
e3629d9
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
ae05623
fix: fix format
donghao526 Oct 25, 2025
0cf8c8a
fix: fix clang-tidy
donghao526 Oct 26, 2025
f855895
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,49 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

class CommandTDigestRevRank : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
inputs_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
inputs_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(inputs_.size());
if (const auto s = tdigest.RevRank(ctx, key_name_, inputs_, result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}

if (!result.empty()) {
std::vector<std::string> rev_ranks;
rev_ranks.reserve(result.size());
for (const auto v : result) {
rev_ranks.push_back(redis::Integer(v));
}
*output = redis::Array(rev_ranks);
} else {
*output = redis::BulkString("nan");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to tdigest.revrank.

All values are -2 if the sketch is empty.

}
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> inputs_;
};

class CommandTDigestMinMax : public Commander {
public:
explicit CommandTDigestMinMax(bool is_min) : is_min_(is_min) {}
Expand Down Expand Up @@ -369,6 +412,7 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRevRank>("tdigest.revrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
Expand Down
89 changes: 66 additions & 23 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ class DummyCentroids {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.cend();
return Valid();
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() {
if (Valid() && iter_ != centroids_.cbegin()) {
if (Valid()) {
Copy link

Copilot AI Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After calling std::advance(iter_, -1), the iterator could become invalid (move before cbegin()). The validity check on line 79 will return true even when iter_ is before cbegin(), allowing subsequent operations on an out-of-bounds iterator. The validity check should occur before the advance operation, and the advance should only happen if iter_ != centroids_.cbegin().

Suggested change
if (Valid()) {
if (Valid() && iter_ != centroids_.cbegin()) {

Copilot uses AI. Check for mistakes.
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const { return iter_ != centroids_.cend(); }
bool Valid() const { return iter_ < centroids_.cend() && iter_ >= centroids_.cbegin(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that iter_ should always greater or equal to the cbegin().

StatusOr<Centroid> GetCentroid() const {
if (iter_ == centroids_.cend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
Expand Down Expand Up @@ -186,8 +186,37 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, con
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata) {
if (metadata->unmerged_nodes == 0) {
return rocksdb::Status::OK();
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata->Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}

ctx.RefreshLatestSnapshot();
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
Expand All @@ -198,31 +227,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}
if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); !status.ok()) {
return status;
}
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}
auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRevRank(dump_centroids, inputs, result);
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}
rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

ctx.RefreshLatestSnapshot();
if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);

rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down Expand Up @@ -117,6 +118,8 @@ class TDigest : public SubKeyScanner {
std::string internalSegmentGuardPrefixKey(const TDigestMetadata& metadata, const std::string& ns_key,
SegmentType seg) const;

rocksdb::Status mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata);

rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
const std::vector<double>* additional_buffer = nullptr,
Expand Down
69 changes: 69 additions & 0 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fmt/format.h>

#include <map>
#include <numeric>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -150,3 +152,70 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) {
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}

inline void AssignRankForEqualInputs(const std::vector<size_t>& indices, double cumulative_weight,
std::vector<int>& result) {
for (auto index : indices) {
result[index] = static_cast<int>(cumulative_weight);
}
}

template <typename TD>
inline Status TDigestRevRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
std::map<double, std::vector<size_t>> value_to_indices;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_indices[inputs[i]].push_back(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is hard to compare two double number for a map. we need a stable way for the compare operator.

}

double cumulative_weight = 0;
result.resize(inputs.size());
auto it = value_to_indices.rbegin();

// handle inputs larger than maximum
while (it != value_to_indices.rend() && it->first > td.Max()) {
AssignRankForEqualInputs(it->second, -1, result);
++it;
}

auto iter = td.End();
while (iter->Valid() && it != value_to_indices.rend()) {
auto centroid = GET_OR_RET(iter->GetCentroid());
auto input_value = it->first;
if (centroid.mean == input_value) {
auto current_mean = centroid.mean;
auto current_mean_cumulative_weight = cumulative_weight + centroid.weight / 2;
cumulative_weight += centroid.weight;

// handle all the prev centroids which has the same mean
while (iter->Prev()) {
auto next_centroid = GET_OR_RET(iter->GetCentroid());
if (current_mean != next_centroid.mean) {
// move back to the last equal centroid, because we will process it in the next loop
iter->Next();
break;
}
current_mean_cumulative_weight += centroid.weight / 2;
cumulative_weight += centroid.weight;
Comment on lines +197 to +198
Copy link

Copilot AI Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line incorrectly adds centroid.weight / 2 instead of next_centroid.weight / 2. The variable next_centroid was retrieved on line 191 but is not being used for the weight calculation, leading to incorrect cumulative weight when processing multiple centroids with the same mean.

Suggested change
current_mean_cumulative_weight += centroid.weight / 2;
cumulative_weight += centroid.weight;
current_mean_cumulative_weight += next_centroid.weight / 2;
cumulative_weight += next_centroid.weight;

Copilot uses AI. Check for mistakes.
}

// handle the prev inputs which has the same value
AssignRankForEqualInputs(it->second, current_mean_cumulative_weight, result);
++it;
iter->Prev();
} else if (centroid.mean > input_value) {
cumulative_weight += centroid.weight;
iter->Prev();
} else {
AssignRankForEqualInputs(it->second, cumulative_weight, result);
++it;
}
}

// handle inputs less than minimum
while (it != value_to_indices.rend()) {
AssignRankForEqualInputs(it->second, td.TotalWeight(), result);
++it;
}

return Status::OK();
}
76 changes: 76 additions & 0 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,79 @@ TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_different_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 20, 30, 40, 50, 60};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {0, 10, 20, 30, 40, 50, 60, 70};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{6, 5, 4, 3, 2, 1, 0, -1};

for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_several_identical_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 10, 10, 20, 20};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{3, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();

status = tdigest_->Add(*ctx_, test_digest_name, std::vector<double>{10});
ASSERT_TRUE(status.ok()) << status.ToString();

result.clear();
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result_new = std::vector<double>{4, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result_new[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_empty_tdigest) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<int> result;
result.reserve(2);
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}
Loading