Skip to content

Commit 80d9412

Browse files
feat(live_agg): Enable pre-aggregation flow
1 parent 39f77fc commit 80d9412

File tree

4 files changed

+23
-59
lines changed

4 files changed

+23
-59
lines changed

events-processor/processors/event_processors/enrichment_service.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ func (s *EventEnrichmentService) enrichWithSubscription(enrichedEvent *models.En
110110
}
111111

112112
func (s *EventEnrichmentService) enrichWithChargeInfo(enrichedEvent *models.EnrichedEvent) utils.Result[[]*models.EnrichedEvent] {
113-
// TODO(pre-aggregation): Remove the NotAPIPostProcessed condition to enable pre-aggregation
114-
if !enrichedEvent.InitialEvent.NotAPIPostProcessed() || enrichedEvent.Subscription == nil {
113+
if enrichedEvent.Subscription == nil {
115114
return utils.SuccessResult([]*models.EnrichedEvent{enrichedEvent})
116115
}
117116

events-processor/processors/event_processors/enrichment_service_test.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -86,53 +86,6 @@ func TestEnrichEvent(t *testing.T) {
8686
assert.Equal(t, "Error fetching billable metric", result.ErrorMessage())
8787
})
8888

89-
t.Run("When event source is post processed on API and the result is successful", func(t *testing.T) {
90-
sqlmock, delete := setupTestEnv(t)
91-
defer delete()
92-
93-
properties := map[string]any{
94-
"api_requests": "12.0",
95-
}
96-
97-
event := models.Event{
98-
OrganizationID: "1a901a90-1a90-1a90-1a90-1a901a901a90",
99-
ExternalSubscriptionID: "sub_id",
100-
Code: "api_calls",
101-
Timestamp: 1741007009,
102-
Source: models.HTTP_RUBY,
103-
Properties: properties,
104-
SourceMetadata: &models.SourceMetadata{
105-
ApiPostProcess: true,
106-
},
107-
}
108-
109-
bm := models.BillableMetric{
110-
ID: "bm123",
111-
OrganizationID: event.OrganizationID,
112-
Code: event.Code,
113-
AggregationType: models.AggregationTypeSum,
114-
FieldName: "api_requests",
115-
Expression: "",
116-
CreatedAt: time.Now(),
117-
UpdatedAt: time.Now(),
118-
}
119-
mockBmLookup(sqlmock, &bm)
120-
121-
sub := models.Subscription{ID: "sub123", PlanID: "plan123"}
122-
mockSubscriptionLookup(sqlmock, &sub)
123-
124-
result := processor.EnrichEvent(&event)
125-
126-
assert.True(t, result.Success())
127-
assert.Equal(t, 1, len(result.Value()))
128-
129-
eventResult := result.Value()[0]
130-
assert.Equal(t, "12.0", *eventResult.Value)
131-
assert.Equal(t, "sum", eventResult.AggregationType)
132-
assert.Equal(t, "sub123", eventResult.SubscriptionID)
133-
assert.Equal(t, "plan123", eventResult.PlanID)
134-
})
135-
13689
t.Run("When timestamp is invalid", func(t *testing.T) {
13790
sqlmock, delete := setupTestEnv(t)
13891
defer delete()
@@ -195,7 +148,7 @@ func TestEnrichEvent(t *testing.T) {
195148
assert.Equal(t, "Error evaluating custom expression", result.ErrorMessage())
196149
})
197150

198-
t.Run("When event source is not post process on API", func(t *testing.T) {
151+
t.Run("With a flat filter", func(t *testing.T) {
199152
sqlmock, delete := setupTestEnv(t)
200153
defer delete()
201154

@@ -236,7 +189,7 @@ func TestEnrichEvent(t *testing.T) {
236189
assert.Equal(t, "12", *eventResult.Value)
237190
})
238191

239-
t.Run("When event source is not post process on API with multiple flat filters", func(t *testing.T) {
192+
t.Run("With multiple flat filters", func(t *testing.T) {
240193
sqlmock, delete := setupTestEnv(t)
241194
defer delete()
242195

@@ -314,7 +267,7 @@ func TestEnrichEvent(t *testing.T) {
314267
assert.Equal(t, map[string]string{}, eventResult2.GroupedBy)
315268
})
316269

317-
t.Run("When event source is not post process on API with a flat filter with pricing group keys", func(t *testing.T) {
270+
t.Run("With a flat filter with pricing group keys", func(t *testing.T) {
318271
sqlmock, delete := setupTestEnv(t)
319272
defer delete()
320273

events-processor/processors/events.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ func processEvent(event *models.Event) utils.Result[*models.EnrichedEvent] {
9494

9595
go processor.ProducerService.ProduceEnrichedEvent(ctx, enrichedEvent)
9696

97-
// TODO(pre-aggregation): Uncomment to enable the feature
98-
// for _, ev := range enrichedEvents {
99-
// go processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
100-
// }
97+
for _, ev := range enrichedEvents {
98+
if ev.ChargeID != nil {
99+
go processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
100+
}
101+
}
101102

102103
if enrichedEvent.Subscription != nil && event.NotAPIPostProcessed() {
103104
payInAdvance := false

events-processor/processors/events_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ func TestProcessEvent(t *testing.T) {
165165
sub := models.Subscription{ID: "sub123", PlanID: "plan123"}
166166
mockSubscriptionLookup(sqlmock, &sub)
167167

168+
mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{
169+
{
170+
OrganizationID: event.OrganizationID,
171+
BillableMetricCode: event.Code,
172+
PlanID: "plan_id",
173+
ChargeID: "charge_idxx",
174+
ChargeUpdatedAt: time.Now(),
175+
PayInAdvance: true,
176+
},
177+
})
178+
168179
result := processEvent(&event)
169180

170181
assert.True(t, result.Success())
@@ -177,7 +188,7 @@ func TestProcessEvent(t *testing.T) {
177188
// TODO: Improve this by using channels in the producers methods
178189
time.Sleep(50 * time.Millisecond)
179190
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
180-
// TODO(pre-aggregation): assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
191+
assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
181192
})
182193

183194
t.Run("When event source is not post process on API when timestamp is invalid", func(t *testing.T) {
@@ -367,7 +378,7 @@ func TestProcessEvent(t *testing.T) {
367378
time.Sleep(50 * time.Millisecond)
368379
assert.Equal(t, 1, testProducers.inAdvanceProducer.ExecutionCount)
369380
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
370-
// TODO(pre-aggregation): assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
381+
assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
371382

372383
assert.Equal(t, 1, flagger.ExecutionCount)
373384
})
@@ -441,7 +452,7 @@ func TestProcessEvent(t *testing.T) {
441452
// TODO: Improve this by using channels in the producers methods
442453
time.Sleep(50 * time.Millisecond)
443454
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
444-
// TODO(pre-aggregation): assert.Equal(t, 2, testProducers.enrichedExpandedProducer.ExecutionCount)
455+
assert.Equal(t, 2, testProducers.enrichedExpandedProducer.ExecutionCount)
445456
})
446457

447458
t.Run("When event source is not post processed on API and it matches no charges", func(t *testing.T) {

0 commit comments

Comments
 (0)