Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def weighted_sum_breakdown(initial_value: 0)
Events::Stores::Utils::ClickhouseConnection.connection_with_retry do |connection|
query = Events::Stores::Clickhouse::WeightedSumQuery.new(store: self)

connection.select_all(
rows = connection.select_all(
ActiveRecord::Base.sanitize_sql_for_conditions(
[
sanitize_colon(query.breakdown_query),
Expand All @@ -566,6 +566,18 @@ def weighted_sum_breakdown(initial_value: 0)
]
)
).rows
# `date_diff` actually returns an `Int64` and ActiveRecord transform that into a `String`. If we cast the
# result in a `Int32`, then we get the result as `Integer`:
# ```ruby
# lago-api(staging)> Clickhouse::BaseRecord.connection.select_one("SELECT 1::Int64")
# => {"CAST('1', 'Int64')" => "1"}
# lago-api(staging)> Clickhouse::BaseRecord.connection.select_one("SELECT 1::Int32")
# => {"CAST('1', 'Int32')" => 1}
# ```
# To keep consistency with the PG implementation, we call `#to_i` on the value.
rows.map do |(timestamp, difference, cumul, second_duration, period_ratio)|
[timestamp, difference, cumul, second_duration.to_i, period_ratio]
end
end
end

Expand Down
331 changes: 2 additions & 329 deletions spec/services/events/stores/clickhouse_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def create_event(timestamp:, value:, properties: {}, transaction_id: SecureRando
)
end

def format_timestamp(timestamp)
Time.zone.parse(timestamp).strftime("%Y-%m-%d %H:%M:%S.%L")
def format_timestamp(timestamp, precision: 3)
Time.zone.parse(timestamp).strftime("%Y-%m-%d %H:%M:%S.%#{precision}L")
end

def force_deduplication
Expand Down Expand Up @@ -278,332 +278,5 @@ def force_deduplication
expect(group.first["operation_type"]).to eq("add")
end
end

describe "#prorated_events_values" do
it "returns the values attached to each event with prorata on period duration" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_events_values(31).map { |v| v.round(3) }).to eq(
[0.516, 0.968, 1.355, 1.677, 1.935]
)
end
end

describe "#prorated_sum" do
it "returns the prorated sum of event properties" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31).round(5)).to eq(6.45161)
end

context "with persisted_duration" do
it "returns the prorated sum of event properties" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31, persisted_duration: 10).round(5)).to eq(4.83871)
end
end
end

describe "#grouped_prorated_sum" do
let(:grouped_by) { %w[region] }

it "returns the prorated sum of event properties" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

result = event_store.grouped_prorated_sum(period_duration: 31)

expect(result).to match_array([
{groups: {"region" => nil}, value: within(0.00001).of(2.64516)},
{groups: {"region" => "europe"}, value: within(0.00001).of(3.80645)}
])
end

context "with persisted_duration" do
it "returns the prorated sum of event properties" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

result = event_store.grouped_prorated_sum(period_duration: 31, persisted_duration: 10)

expect(result).to match_array([
{groups: {"region" => nil}, value: within(0.00001).of(1.93548)},
{groups: {"region" => "europe"}, value: within(0.00001).of(2.90322)}
])
end
end

context "with multiple groups" do
let(:grouped_by) { %w[region country] }

it "returns the sum of values grouped by the provided groups" do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

result = event_store.grouped_prorated_sum(period_duration: 31)

expect(result).to match_array(
[
{
groups: {"country" => "united kingdom", "region" => "europe"},
value: within(0.00001).of(1.93548)
},
{
groups: {"country" => nil, "region" => nil},
value: within(0.00001).of(2.64516)
},
{
groups: {"country" => "france", "region" => "europe"},
value: within(0.00001).of(1.87096)
}
]
)
end
end
end

describe "#weighted_sum" do
let(:started_at) { Time.zone.parse("2023-03-01") }

let(:events_values) do
[
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 2},
{timestamp: Time.zone.parse("2023-03-05 01:00:00"), value: 3},
{timestamp: Time.zone.parse("2023-03-05 01:30:00"), value: 1},
{timestamp: Time.zone.parse("2023-03-05 02:00:00"), value: -4},
{timestamp: Time.zone.parse("2023-03-05 04:00:00"), value: -2},
{timestamp: Time.zone.parse("2023-03-05 05:00:00"), value: 10},
{timestamp: Time.zone.parse("2023-03-05 05:30:00"), value: -10}
]
end

let(:events) do
events_values.map do |values|
properties = {value: values[:value]}
properties[:region] = values[:region] if values[:region]

Clickhouse::EventsEnriched.create!(
transaction_id: SecureRandom.uuid,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
code:,
timestamp: values[:timestamp],
properties:,
value: values[:value].to_s,
decimal_value: values[:value].to_d
)
end
end

before do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true
end

it "returns the weighted sum of event properties" do
expect(event_store.weighted_sum.round(5)).to eq(0.02218)
end

context "with a single event" do
let(:events_values) do
[
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 1000}
]
end

it "returns the weighted sum of event properties" do
expect(event_store.weighted_sum.round(5)).to eq(870.96774) # 4 / 31 * 0 + 27 / 31 * 1000
end
end

context "with no events" do
let(:events_values) { [] }

it "returns the weighted sum of event properties" do
expect(event_store.weighted_sum.round(5)).to eq(0.0)
end
end

context "with events with the same timestamp" do
let(:events_values) do
[
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 3},
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 3}
]
end

it "returns the weighted sum of event properties" do
expect(event_store.weighted_sum.round(5)).to eq(5.22581) # 4 / 31 * 0 + 27 / 31 * 6
end
end

context "with initial value" do
let(:initial_value) { 1000 }

it "uses the initial value in the aggregation" do
expect(event_store.weighted_sum(initial_value:).round(5)).to eq(1000.02218)
end

context "without events" do
let(:events_values) { [] }

it "uses only the initial value in the aggregation" do
expect(event_store.weighted_sum(initial_value:).round(5)).to eq(1000.0)
end
end
end

context "with filters" do
let(:matching_filters) { {region: ["europe"]} }

let(:events_values) do
[
{timestamp: Time.zone.parse("2023-03-04 00:00:00.000"), value: 1000, region: "us"},
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 1000, region: "europe"}
]
end

it "returns the weighted sum of event properties scoped to the group" do
expect(event_store.weighted_sum.round(5)).to eq(870.96774) # 4 / 31 * 0 + 27 / 31 * 1000
end
end
end

describe "#grouped_weighted_sum" do
let(:grouped_by) { %w[agent_name other] }

let(:started_at) { Time.zone.parse("2023-03-01") }

let(:events_values) do
[
{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 2, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 01:00:00"), value: 3, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 01:30:00"), value: 1, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 02:00:00"), value: -4, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 04:00:00"), value: -2, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 05:00:00"), value: 10, agent_name: "frodo"},
{timestamp: Time.zone.parse("2023-03-05 05:30:00"), value: -10, agent_name: "frodo"},

{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 2, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 01:00:00"), value: 3, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 01:30:00"), value: 1, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 02:00:00"), value: -4, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 04:00:00"), value: -2, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 05:00:00"), value: 10, agent_name: "aragorn"},
{timestamp: Time.zone.parse("2023-03-05 05:30:00"), value: -10, agent_name: "aragorn"},

{timestamp: Time.zone.parse("2023-03-05 00:00:00.000"), value: 2},
{timestamp: Time.zone.parse("2023-03-05 01:00:00"), value: 3},
{timestamp: Time.zone.parse("2023-03-05 01:30:00"), value: 1},
{timestamp: Time.zone.parse("2023-03-05 02:00:00"), value: -4},
{timestamp: Time.zone.parse("2023-03-05 04:00:00"), value: -2},
{timestamp: Time.zone.parse("2023-03-05 05:00:00"), value: 10},
{timestamp: Time.zone.parse("2023-03-05 05:30:00"), value: -10}
]
end

let(:events) do
events_values.map do |values|
properties = {value: values[:value]}
properties[:region] = values[:region] if values[:region]
properties[:agent_name] = values[:agent_name] if values[:agent_name]

Clickhouse::EventsEnriched.create!(
transaction_id: SecureRandom.uuid,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
code:,
timestamp: values[:timestamp],
properties:,
value: values[:value].to_s,
decimal_value: values[:value].to_d
)
end
end

before do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true
end

it "returns the weighted sum of event properties" do
result = event_store.grouped_weighted_sum

expect(result.count).to eq(3)

null_group = result.find { |v| v[:groups]["agent_name"].nil? }
expect(null_group[:groups]["agent_name"]).to be_nil
expect(null_group[:groups]["other"]).to be_nil
expect(null_group[:value].round(5)).to eq(0.02218)

(result - [null_group]).each do |row|
expect(row[:groups]["agent_name"]).not_to be_nil
expect(row[:groups]["other"]).to be_nil
expect(row[:value].round(5)).to eq(0.02218)
end
end

context "with no events" do
let(:events_values) { [] }

it "returns the weighted sum of event properties" do
result = event_store.grouped_weighted_sum

expect(result.count).to eq(0)
end
end

context "with initial values" do
let(:initial_values) do
[
{groups: {"agent_name" => "frodo", "other" => nil}, value: 1000},
{groups: {"agent_name" => "aragorn", "other" => nil}, value: 1000},
{groups: {"agent_name" => nil, "other" => nil}, value: 1000}
]
end

it "uses the initial value in the aggregation" do
result = event_store.grouped_weighted_sum(initial_values:)

expect(result.count).to eq(3)

null_group = result.find { |v| v[:groups]["agent_name"].nil? }
expect(null_group[:groups]["agent_name"]).to be_nil
expect(null_group[:groups]["other"]).to be_nil
expect(null_group[:value].round(5)).to eq(1000.02218)

(result - [null_group]).each do |row|
expect(row[:groups]["agent_name"]).not_to be_nil
expect(row[:groups]["other"]).to be_nil
expect(row[:value].round(5)).to eq(1000.02218)
end
end

context "without events" do
let(:events_values) { [] }

it "uses only the initial value in the aggregation" do
result = event_store.grouped_weighted_sum(initial_values:)

expect(result.count).to eq(3)

null_group = result.find { |v| v[:groups]["agent_name"].nil? }
expect(null_group[:groups]["agent_name"]).to be_nil
expect(null_group[:groups]["other"]).to be_nil
expect(null_group[:value].round(5)).to eq(1000)

(result - [null_group]).each do |row|
expect(row[:groups]["agent_name"]).not_to be_nil
expect(row[:groups]["other"]).to be_nil
expect(row[:value].round(5)).to eq(1000)
end
end
end
end
end
end
end
Loading
Loading