diff --git a/events-processor/models/stores_test.go b/events-processor/models/stores_test.go index 1f1874a60..f62ae7853 100644 --- a/events-processor/models/stores_test.go +++ b/events-processor/models/stores_test.go @@ -9,11 +9,11 @@ import ( ) func setupApiStore(t *testing.T) (*ApiStore, sqlmock.Sqlmock, func()) { - db, mock, delete := tests.SetupMockStore(t) + mock, delete := tests.SetupMockStore(t) store := &ApiStore{ - db: db, + db: mock.DB, } - return store, mock, delete + return store, mock.SQLMock, delete } diff --git a/events-processor/processors/event_processors/base_service.go b/events-processor/processors/event_processors/base_service.go deleted file mode 100644 index 3d1d1bac7..000000000 --- a/events-processor/processors/event_processors/base_service.go +++ /dev/null @@ -1,40 +0,0 @@ -package event_processors - -import ( - "github.com/getlago/lago/events-processor/models" - "github.com/getlago/lago/events-processor/utils" -) - -type EventProcessor struct { - EnrichmentService *EventEnrichmentService - ProducerService *EventProducerService - RefreshService *SubscriptionRefreshService - CacheService *CacheService -} - -func NewEventProcessor(enrichmentService *EventEnrichmentService, producerService *EventProducerService, refreshService *SubscriptionRefreshService, cacheService *CacheService) *EventProcessor { - return &EventProcessor{ - EnrichmentService: enrichmentService, - ProducerService: producerService, - RefreshService: refreshService, - CacheService: cacheService, - } -} - -func failedResult(r utils.AnyResult, code string, message string) utils.Result[*models.EnrichedEvent] { - result := utils.FailedResult[*models.EnrichedEvent](r.Error()).AddErrorDetails(code, message) - result.Retryable = r.IsRetryable() - result.Capture = r.IsCapturable() - return result -} - -func failedMultiEventsResult(r utils.AnyResult, code string, message string) utils.Result[[]*models.EnrichedEvent] { - result := utils.FailedResult[[]*models.EnrichedEvent](r.Error()).AddErrorDetails(code, message) - result.Retryable = r.IsRetryable() - result.Capture = r.IsCapturable() - return result -} - -func toMultiEventsResult(r utils.Result[*models.EnrichedEvent]) utils.Result[[]*models.EnrichedEvent] { - return failedMultiEventsResult(r, r.ErrorCode(), r.ErrorMessage()) -} diff --git a/events-processor/processors/event_processors/cache_service.go b/events-processor/processors/events_processor/cache_service.go similarity index 96% rename from events-processor/processors/event_processors/cache_service.go rename to events-processor/processors/events_processor/cache_service.go index 086536077..28faa5235 100644 --- a/events-processor/processors/event_processors/cache_service.go +++ b/events-processor/processors/events_processor/cache_service.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "github.com/getlago/lago/events-processor/models" diff --git a/events-processor/processors/event_processors/cache_service_test.go b/events-processor/processors/events_processor/cache_service_test.go similarity index 99% rename from events-processor/processors/event_processors/cache_service_test.go rename to events-processor/processors/events_processor/cache_service_test.go index 8122a0c33..deb50ffb2 100644 --- a/events-processor/processors/event_processors/cache_service_test.go +++ b/events-processor/processors/events_processor/cache_service_test.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "testing" diff --git a/events-processor/processors/event_processors/enrichment_service.go b/events-processor/processors/events_processor/enrichment_service.go similarity index 99% rename from events-processor/processors/event_processors/enrichment_service.go rename to events-processor/processors/events_processor/enrichment_service.go index 5143b33e0..899319de0 100644 --- a/events-processor/processors/event_processors/enrichment_service.go +++ b/events-processor/processors/events_processor/enrichment_service.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "encoding/json" diff --git a/events-processor/processors/event_processors/enrichment_service_test.go b/events-processor/processors/events_processor/enrichment_service_test.go similarity index 79% rename from events-processor/processors/event_processors/enrichment_service_test.go rename to events-processor/processors/events_processor/enrichment_service_test.go index 027159094..76e1c28e2 100644 --- a/events-processor/processors/event_processors/enrichment_service_test.go +++ b/events-processor/processors/events_processor/enrichment_service_test.go @@ -1,11 +1,10 @@ -package event_processors +package events_processor import ( "sort" "testing" "time" - "github.com/DATA-DOG/go-sqlmock" "github.com/getlago/lago/events-processor/models" "github.com/getlago/lago/events-processor/tests" "github.com/getlago/lago/events-processor/utils" @@ -13,61 +12,20 @@ import ( "gorm.io/gorm" ) -var processor *EventEnrichmentService +func setupEnrichmentTestEnv(t *testing.T) (*EventEnrichmentService, *tests.MockedStore, func()) { + mockedStore, delete := tests.SetupMockStore(t) + apiStore := models.NewApiStore(mockedStore.DB) -func setupTestEnv(t *testing.T) (sqlmock.Sqlmock, func()) { - db, mock, delete := tests.SetupMockStore(t) - apiStore := models.NewApiStore(db) - - processor = &EventEnrichmentService{ + processor := &EventEnrichmentService{ apiStore: apiStore, } - return mock, delete -} - -func mockBmLookup(sqlmock sqlmock.Sqlmock, bm *models.BillableMetric) { - columns := []string{"id", "organization_id", "code", "aggregation_type", "field_name", "expression", "created_at", "updated_at", "deleted_at"} - - rows := sqlmock.NewRows(columns). - AddRow(bm.ID, bm.OrganizationID, bm.Code, bm.AggregationType, bm.FieldName, bm.Expression, bm.CreatedAt, bm.UpdatedAt, bm.DeletedAt) - - sqlmock.ExpectQuery("SELECT \\* FROM \"billable_metrics\".*").WillReturnRows(rows) -} - -func mockSubscriptionLookup(sqlmock sqlmock.Sqlmock, sub *models.Subscription) { - columns := []string{"id", "external_id", "plan_id", "created_at", "updated_at", "terminated_at"} - - rows := sqlmock.NewRows(columns). - AddRow(sub.ID, sub.ExternalID, sub.PlanID, sub.CreatedAt, sub.UpdatedAt, sub.TerminatedAt) - - sqlmock.ExpectQuery(".* FROM \"subscriptions\".*").WillReturnRows(rows) -} - -func mockFlatFiltersLookup(sqlmock sqlmock.Sqlmock, filters []*models.FlatFilter) { - columns := []string{"organization_id", "billable_metric_code", "plan_id", "charge_id", "charge_updated_at", "charge_filter_id", "charge_filter_updated_at", "filters", "pricing_group_keys"} - - rows := sqlmock.NewRows(columns) - - for _, filter := range filters { - rows.AddRow( - filter.OrganizationID, - filter.BillableMetricCode, - filter.PlanID, - filter.ChargeID, - filter.ChargeUpdatedAt, - filter.ChargeFilterID, - filter.ChargeFilterUpdatedAt, - filter.Filters, - filter.PricingGroupKeys, - ) - } - sqlmock.ExpectQuery(".* FROM \"flat_filters\".*").WillReturnRows(rows) + return processor, mockedStore, delete } func TestEnrichEvent(t *testing.T) { t.Run("Without Billable Metric", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() event := models.Event{ @@ -77,7 +35,7 @@ func TestEnrichEvent(t *testing.T) { Timestamp: 1741007009, } - sqlmock.ExpectQuery(".*").WillReturnError(gorm.ErrRecordNotFound) + mockedStore.SQLMock.ExpectQuery(".*").WillReturnError(gorm.ErrRecordNotFound) result := processor.EnrichEvent(&event) assert.False(t, result.Success()) @@ -87,7 +45,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When event source is post processed on API and the result is successful", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() properties := map[string]any{ @@ -116,10 +74,10 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123", PlanID: "plan123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) result := processor.EnrichEvent(&event) @@ -134,7 +92,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When timestamp is invalid", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() event := models.Event{ @@ -155,7 +113,7 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) result := processor.EnrichEvent(&event) assert.False(t, result.Success()) @@ -165,7 +123,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When expression failed to evaluate", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() event := models.Event{ @@ -186,7 +144,7 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) result := processor.EnrichEvent(&event) assert.False(t, result.Success()) @@ -196,7 +154,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When event source is not post process on API", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() properties := map[string]any{ @@ -222,11 +180,11 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123"} - mockSubscriptionLookup(sqlmock, &sub) - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{}) + mockSubscriptionLookup(mockedStore, &sub) + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{}) result := processor.EnrichEvent(&event) assert.True(t, result.Success()) @@ -237,7 +195,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When event source is not post process on API with multiple flat filters", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() properties := map[string]any{ @@ -264,10 +222,10 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) now1 := time.Now() flatFilter1 := &models.FlatFilter{ @@ -292,7 +250,7 @@ func TestEnrichEvent(t *testing.T) { ChargeFilterUpdatedAt: &now2, Filters: &models.FlatFilterValues{"scheme": []string{"visa"}}, } - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{flatFilter1, flatFilter2}) + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{flatFilter1, flatFilter2}) result := processor.EnrichEvent(&event) assert.True(t, result.Success()) @@ -315,7 +273,7 @@ func TestEnrichEvent(t *testing.T) { }) t.Run("When event source is not post process on API with a flat filter with pricing group keys", func(t *testing.T) { - sqlmock, delete := setupTestEnv(t) + processor, mockedStore, delete := setupEnrichmentTestEnv(t) defer delete() properties := map[string]any{ @@ -344,10 +302,10 @@ func TestEnrichEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) now := time.Now() chargeFilterId := "charge_filter_id" @@ -363,7 +321,7 @@ func TestEnrichEvent(t *testing.T) { Filters: &models.FlatFilterValues{"scheme": []string{"visa"}}, PricingGroupKeys: []string{"country", "type"}, } - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{flatFilter}) + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{flatFilter}) result := processor.EnrichEvent(&event) assert.True(t, result.Success()) @@ -377,7 +335,7 @@ func TestEnrichEvent(t *testing.T) { } func TestEvaluateExpression(t *testing.T) { - _, delete := setupTestEnv(t) + processor, _, delete := setupEnrichmentTestEnv(t) defer delete() bm := models.BillableMetric{} diff --git a/events-processor/processors/event_processors/event_producer_service.go b/events-processor/processors/events_processor/event_producer_service.go similarity index 99% rename from events-processor/processors/event_processors/event_producer_service.go rename to events-processor/processors/events_processor/event_producer_service.go index 779943a1e..884292c1b 100644 --- a/events-processor/processors/event_processors/event_producer_service.go +++ b/events-processor/processors/events_processor/event_producer_service.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "context" diff --git a/events-processor/processors/event_processors/event_producer_service_test.go b/events-processor/processors/events_processor/event_producer_service_test.go similarity index 99% rename from events-processor/processors/event_processors/event_producer_service_test.go rename to events-processor/processors/events_processor/event_producer_service_test.go index 20de52461..1fa60b45c 100644 --- a/events-processor/processors/event_processors/event_producer_service_test.go +++ b/events-processor/processors/events_processor/event_producer_service_test.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "context" diff --git a/events-processor/processors/events.go b/events-processor/processors/events_processor/processor.go similarity index 69% rename from events-processor/processors/events.go rename to events-processor/processors/events_processor/processor.go index 495e5150b..7d094ed0c 100644 --- a/events-processor/processors/events.go +++ b/events-processor/processors/events_processor/processor.go @@ -1,4 +1,4 @@ -package processors +package events_processor import ( "context" @@ -15,7 +15,25 @@ import ( "github.com/getlago/lago/events-processor/utils" ) -func processEvents(records []*kgo.Record) []*kgo.Record { +type EventProcessor struct { + logger *slog.Logger + EnrichmentService *EventEnrichmentService + ProducerService *EventProducerService + RefreshService *SubscriptionRefreshService + CacheService *CacheService +} + +func NewEventProcessor(logger *slog.Logger, enrichmentService *EventEnrichmentService, producerService *EventProducerService, refreshService *SubscriptionRefreshService, cacheService *CacheService) *EventProcessor { + return &EventProcessor{ + logger: logger, + EnrichmentService: enrichmentService, + ProducerService: producerService, + RefreshService: refreshService, + CacheService: cacheService, + } +} + +func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Record { ctx := context.Background() span := tracer.GetTracerSpan(ctx, "post_process", "PostProcess.ProcessEvents") recordsAttr := attribute.Int("records.length", len(records)) @@ -38,7 +56,7 @@ func processEvents(records []*kgo.Record) []*kgo.Record { event := models.Event{} err := json.Unmarshal(record.Value, &event) if err != nil { - logger.Error("Error unmarshalling message", slog.String("error", err.Error())) + processor.logger.Error("Error unmarshalling message", slog.String("error", err.Error())) utils.CaptureError(err) mu.Lock() @@ -48,9 +66,9 @@ func processEvents(records []*kgo.Record) []*kgo.Record { return } - result := processEvent(&event) + result := processor.processEvent(ctx, &event) if result.Failure() { - logger.Error( + processor.logger.Error( result.ErrorMessage(), slog.String("error_code", result.ErrorCode()), slog.String("error", result.ErrorMsg()), @@ -83,7 +101,7 @@ func processEvents(records []*kgo.Record) []*kgo.Record { return processedRecords } -func processEvent(event *models.Event) utils.Result[*models.EnrichedEvent] { +func (processor *EventProcessor) processEvent(ctx context.Context, event *models.Event) utils.Result[*models.EnrichedEvent] { enrichedEventResult := processor.EnrichmentService.EnrichEvent(event) if enrichedEventResult.Failure() { return failedResult(enrichedEventResult, enrichedEventResult.ErrorCode(), enrichedEventResult.ErrorMessage()) @@ -130,3 +148,14 @@ func failedResult(r utils.AnyResult, code string, message string) utils.Result[* result.Capture = r.IsCapturable() return result } + +func failedMultiEventsResult(r utils.AnyResult, code string, message string) utils.Result[[]*models.EnrichedEvent] { + result := utils.FailedResult[[]*models.EnrichedEvent](r.Error()).AddErrorDetails(code, message) + result.Retryable = r.IsRetryable() + result.Capture = r.IsCapturable() + return result +} + +func toMultiEventsResult(r utils.Result[*models.EnrichedEvent]) utils.Result[[]*models.EnrichedEvent] { + return failedMultiEventsResult(r, r.ErrorCode(), r.ErrorMessage()) +} diff --git a/events-processor/processors/events_test.go b/events-processor/processors/events_processor/processor_test.go similarity index 78% rename from events-processor/processors/events_test.go rename to events-processor/processors/events_processor/processor_test.go index 77ea75de9..b1934420f 100644 --- a/events-processor/processors/events_test.go +++ b/events-processor/processors/events_processor/processor_test.go @@ -1,4 +1,4 @@ -package processors +package events_processor import ( "context" @@ -12,7 +12,6 @@ import ( "gorm.io/gorm" "github.com/getlago/lago/events-processor/models" - "github.com/getlago/lago/events-processor/processors/event_processors" "github.com/getlago/lago/events-processor/utils" "github.com/getlago/lago/events-processor/tests" @@ -23,7 +22,7 @@ type testProducerService struct { enrichedExpandedProducer *tests.MockMessageProducer inAdvanceProducer *tests.MockMessageProducer deadLetterProducer *tests.MockMessageProducer - producers *event_processors.EventProducerService + producers *EventProducerService } func setupProducers() *testProducerService { @@ -35,7 +34,7 @@ func setupProducers() *testProducerService { logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) - producers := event_processors.NewEventProducerService( + producers := NewEventProducerService( &enrichedProducer, &enrichedExpandedProducer, &inAdvanceProducer, @@ -52,14 +51,12 @@ func setupProducers() *testProducerService { } } -func setupTestEnv(t *testing.T) (sqlmock.Sqlmock, *testProducerService, *tests.MockFlagStore, func()) { - ctx = context.Background() - +func setupProcessorTestEnv(t *testing.T) (*EventProcessor, *tests.MockedStore, *testProducerService, *tests.MockFlagStore, func()) { logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) - db, mock, delete := tests.SetupMockStore(t) - apiStore = models.NewApiStore(db) + mockedStore, delete := tests.SetupMockStore(t) + apiStore := models.NewApiStore(mockedStore.DB) testProducers := setupProducers() @@ -68,50 +65,74 @@ func setupTestEnv(t *testing.T) (sqlmock.Sqlmock, *testProducerService, *tests.M chargeCacheStore := models.NewChargeCache(&chargeCache) flagStore := tests.MockFlagStore{} - flagger := event_processors.NewSubscriptionRefreshService(&flagStore) + flagger := NewSubscriptionRefreshService(&flagStore) - processor = event_processors.NewEventProcessor( - event_processors.NewEventEnrichmentService(apiStore), + processor := NewEventProcessor( + logger, + NewEventEnrichmentService(apiStore), testProducers.producers, flagger, - event_processors.NewCacheService(chargeCacheStore), + NewCacheService(chargeCacheStore), ) - return mock, testProducers, &flagStore, delete + return processor, mockedStore, testProducers, &flagStore, delete } -func mockBmLookup(sqlmock sqlmock.Sqlmock, bm *models.BillableMetric) { +func mockBmLookup(mock *tests.MockedStore, bm *models.BillableMetric) { columns := []string{"id", "organization_id", "code", "aggregation_type", "field_name", "expression", "created_at", "updated_at", "deleted_at"} rows := sqlmock.NewRows(columns). AddRow(bm.ID, bm.OrganizationID, bm.Code, bm.AggregationType, bm.FieldName, bm.Expression, bm.CreatedAt, bm.UpdatedAt, bm.DeletedAt) - sqlmock.ExpectQuery("SELECT \\* FROM \"billable_metrics\".*").WillReturnRows(rows) + mock.SQLMock.ExpectQuery("SELECT \\* FROM \"billable_metrics\".*").WillReturnRows(rows) } -func mockSubscriptionLookup(sqlmock sqlmock.Sqlmock, sub *models.Subscription) { +func mockSubscriptionLookup(mock *tests.MockedStore, sub *models.Subscription) { columns := []string{"id", "external_id", "plan_id", "created_at", "updated_at", "terminated_at"} rows := sqlmock.NewRows(columns). AddRow(sub.ID, sub.ExternalID, sub.PlanID, sub.CreatedAt, sub.UpdatedAt, sub.TerminatedAt) - sqlmock.ExpectQuery(".* FROM \"subscriptions\".*").WillReturnRows(rows) + mock.SQLMock.ExpectQuery(".* FROM \"subscriptions\".*").WillReturnRows(rows) } -func mockFlatFiltersLookup(sqlmock sqlmock.Sqlmock, filters []*models.FlatFilter) { - columns := []string{"organization_id", "billable_metric_code", "plan_id", "charge_id", "charge_updated_at", "charge_filter_id", "charge_filter_updated_at", "filters", "pay_in_advance"} +func mockFlatFiltersLookup(mock *tests.MockedStore, filters []*models.FlatFilter) { + columns := []string{ + "organization_id", + "billable_metric_code", + "pay_in_advance", + "plan_id", + "charge_id", + "charge_updated_at", + "charge_filter_id", + "charge_filter_updated_at", + "filters", + "pricing_group_keys", + } rows := sqlmock.NewRows(columns) for _, filter := range filters { - rows.AddRow(filter.OrganizationID, filter.BillableMetricCode, filter.PlanID, filter.ChargeID, filter.ChargeUpdatedAt, filter.ChargeFilterID, filter.ChargeFilterUpdatedAt, filter.Filters, filter.PayInAdvance) + rows.AddRow( + filter.OrganizationID, + filter.BillableMetricCode, + filter.PayInAdvance, + filter.PlanID, + filter.ChargeID, + filter.ChargeUpdatedAt, + filter.ChargeFilterID, + filter.ChargeFilterUpdatedAt, + filter.Filters, + filter.PricingGroupKeys, + ) } - sqlmock.ExpectQuery(".* FROM \"flat_filters\".*").WillReturnRows(rows) + + mock.SQLMock.ExpectQuery(".* FROM \"flat_filters\".*").WillReturnRows(rows) } func TestProcessEvent(t *testing.T) { t.Run("Without Billable Metric", func(t *testing.T) { - sqlmock, _, _, delete := setupTestEnv(t) + processor, mockedStore, _, _, delete := setupProcessorTestEnv(t) defer delete() event := models.Event{ @@ -121,9 +142,9 @@ func TestProcessEvent(t *testing.T) { Timestamp: 1741007009, } - sqlmock.ExpectQuery(".*").WillReturnError(gorm.ErrRecordNotFound) + mockedStore.SQLMock.ExpectQuery(".*").WillReturnError(gorm.ErrRecordNotFound) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.False(t, result.Success()) assert.Equal(t, "record not found", result.ErrorMsg()) assert.Equal(t, "fetch_billable_metric", result.ErrorCode()) @@ -131,7 +152,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is post processed on API", func(t *testing.T) { - sqlmock, testProducers, _, delete := setupTestEnv(t) + processor, mockedStore, testProducers, _, delete := setupProcessorTestEnv(t) defer delete() properties := map[string]any{ @@ -160,12 +181,12 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123", PlanID: "plan123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.True(t, result.Success()) assert.Equal(t, "12.0", *result.Value().Value) @@ -181,7 +202,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post process on API when timestamp is invalid", func(t *testing.T) { - sqlmock, _, _, delete := setupTestEnv(t) + processor, mockedStore, _, _, delete := setupProcessorTestEnv(t) defer delete() event := models.Event{ @@ -202,9 +223,9 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.False(t, result.Success()) assert.Equal(t, "strconv.ParseFloat: parsing \"2025-03-06T12:00:00Z\": invalid syntax", result.ErrorMsg()) assert.Equal(t, "build_enriched_event", result.ErrorCode()) @@ -212,7 +233,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post process on API when no subscriptions are found", func(t *testing.T) { - sqlmock, _, _, delete := setupTestEnv(t) + processor, mockedStore, _, _, delete := setupProcessorTestEnv(t) defer delete() event := models.Event{ @@ -233,16 +254,16 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) - sqlmock.ExpectQuery(".* FROM \"subscriptions\"").WillReturnError(gorm.ErrRecordNotFound) + mockedStore.SQLMock.ExpectQuery(".* FROM \"subscriptions\"").WillReturnError(gorm.ErrRecordNotFound) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.True(t, result.Success()) }) t.Run("When event source is not post process on API with error when fetching subscription", func(t *testing.T) { - sqlmock, _, _, delete := setupTestEnv(t) + processor, mockedStore, _, _, delete := setupProcessorTestEnv(t) defer delete() event := models.Event{ @@ -263,11 +284,11 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) - sqlmock.ExpectQuery(".* FROM \"subscriptions\"").WillReturnError(gorm.ErrNotImplemented) + mockedStore.SQLMock.ExpectQuery(".* FROM \"subscriptions\"").WillReturnError(gorm.ErrNotImplemented) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.False(t, result.Success()) assert.NotNil(t, result.ErrorMsg()) assert.Equal(t, "fetch_subscription", result.ErrorCode()) @@ -275,7 +296,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post process on API when expression failed to evaluate", func(t *testing.T) { - sqlmock, _, _, delete := setupTestEnv(t) + processor, mockedStore, _, _, delete := setupProcessorTestEnv(t) defer delete() // properties := map[string]any{ @@ -301,12 +322,12 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.False(t, result.Success()) assert.Contains(t, result.ErrorMsg(), "Failed to evaluate expr: round(event.properties.value)") assert.Equal(t, "evaluate_expression", result.ErrorCode()) @@ -314,7 +335,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post process on API and events belongs to an in advance charge", func(t *testing.T) { - sqlmock, testProducers, flagger, delete := setupTestEnv(t) + processor, mockedStore, testProducers, flagger, delete := setupProcessorTestEnv(t) defer delete() properties := map[string]any{ @@ -340,14 +361,14 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) now := time.Now() - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{ + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{ { OrganizationID: "org_id", BillableMetricCode: "api_call", @@ -358,7 +379,7 @@ func TestProcessEvent(t *testing.T) { }, }) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.True(t, result.Success()) assert.Equal(t, "12", *result.Value().Value) @@ -373,7 +394,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post processed on API and it matches multiple charges", func(t *testing.T) { - sqlmock, testProducers, _, delete := setupTestEnv(t) + processor, mockedStore, testProducers, _, delete := setupProcessorTestEnv(t) defer delete() properties := map[string]any{ @@ -399,10 +420,10 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123", PlanID: "plan123"} - mockSubscriptionLookup(sqlmock, &sub) + mockSubscriptionLookup(mockedStore, &sub) now := time.Now() @@ -427,9 +448,9 @@ func TestProcessEvent(t *testing.T) { ChargeFilterUpdatedAt: &now, Filters: &models.FlatFilterValues{"scheme": []string{"visa"}}, } - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{flatFilter1, flatFilter2}) + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{flatFilter1, flatFilter2}) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.True(t, result.Success()) assert.Equal(t, "12.0", *result.Value().Value) @@ -445,7 +466,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("When event source is not post processed on API and it matches no charges", func(t *testing.T) { - sqlmock, testProducers, _, delete := setupTestEnv(t) + processor, mockedStore, testProducers, _, delete := setupProcessorTestEnv(t) defer delete() properties := map[string]any{ @@ -471,13 +492,13 @@ func TestProcessEvent(t *testing.T) { CreatedAt: time.Now(), UpdatedAt: time.Now(), } - mockBmLookup(sqlmock, &bm) + mockBmLookup(mockedStore, &bm) sub := models.Subscription{ID: "sub123", PlanID: "plan123"} - mockSubscriptionLookup(sqlmock, &sub) - mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{}) + mockSubscriptionLookup(mockedStore, &sub) + mockFlatFiltersLookup(mockedStore, []*models.FlatFilter{}) - result := processEvent(&event) + result := processor.processEvent(context.Background(), &event) assert.True(t, result.Success()) assert.Equal(t, "12.0", *result.Value().Value) diff --git a/events-processor/processors/event_processors/subscription_refresh_service.go b/events-processor/processors/events_processor/subscription_refresh_service.go similarity index 96% rename from events-processor/processors/event_processors/subscription_refresh_service.go rename to events-processor/processors/events_processor/subscription_refresh_service.go index 44b97e54a..f00c37423 100644 --- a/events-processor/processors/event_processors/subscription_refresh_service.go +++ b/events-processor/processors/events_processor/subscription_refresh_service.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "fmt" diff --git a/events-processor/processors/event_processors/subscription_refresh_service_test.go b/events-processor/processors/events_processor/subscription_refresh_service_test.go similarity index 97% rename from events-processor/processors/event_processors/subscription_refresh_service_test.go rename to events-processor/processors/events_processor/subscription_refresh_service_test.go index bfc084e74..fce20c55f 100644 --- a/events-processor/processors/event_processors/subscription_refresh_service_test.go +++ b/events-processor/processors/events_processor/subscription_refresh_service_test.go @@ -1,4 +1,4 @@ -package event_processors +package events_processor import ( "fmt" diff --git a/events-processor/processors/processors.go b/events-processor/processors/main_processor.go similarity index 94% rename from events-processor/processors/processors.go rename to events-processor/processors/main_processor.go index 6eaa31451..3e2013c33 100644 --- a/events-processor/processors/processors.go +++ b/events-processor/processors/main_processor.go @@ -13,14 +13,14 @@ import ( "github.com/getlago/lago/events-processor/config/kafka" "github.com/getlago/lago/events-processor/config/redis" "github.com/getlago/lago/events-processor/models" - "github.com/getlago/lago/events-processor/processors/event_processors" + "github.com/getlago/lago/events-processor/processors/events_processor" "github.com/getlago/lago/events-processor/utils" ) var ( ctx context.Context logger *slog.Logger - processor *event_processors.EventProcessor + processor *events_processor.EventProcessor apiStore *models.ApiStore kafkaConfig kafka.ServerConfig chargeCacheStore *models.ChargeCache @@ -224,17 +224,18 @@ func StartProcessingEvents() { chargeCacheStore = cacher defer chargeCacheStore.CacheStore.Close() - processor = event_processors.NewEventProcessor( - event_processors.NewEventEnrichmentService(apiStore), - event_processors.NewEventProducerService( + processor = events_processor.NewEventProcessor( + logger, + events_processor.NewEventEnrichmentService(apiStore), + events_processor.NewEventProducerService( eventsEnrichedProducerResult.Value(), eventsEnrichedExpandedProducerResult.Value(), eventsInAdvanceProducerResult.Value(), eventsDeadLetterQueueResult.Value(), logger, ), - event_processors.NewSubscriptionRefreshService(flagger), - event_processors.NewCacheService(chargeCacheStore), + events_processor.NewSubscriptionRefreshService(flagger), + events_processor.NewCacheService(chargeCacheStore), ) cg, err := kafka.NewConsumerGroup( @@ -243,7 +244,7 @@ func StartProcessingEvents() { Topic: os.Getenv(envLagoKafkaRawEventsTopic), ConsumerGroup: os.Getenv(envLagoKafkaConsumerGroup), ProcessRecords: func(records []*kgo.Record) []*kgo.Record { - return processEvents(records) + return processor.ProcessEvents(records) }, }) if err != nil { diff --git a/events-processor/tests/mocked_store.go b/events-processor/tests/mocked_store.go index bcc6a3245..09a41aee9 100644 --- a/events-processor/tests/mocked_store.go +++ b/events-processor/tests/mocked_store.go @@ -11,7 +11,12 @@ import ( "github.com/getlago/lago/events-processor/config/database" ) -func SetupMockStore(t *testing.T) (*database.DB, sqlmock.Sqlmock, func()) { +type MockedStore struct { + DB *database.DB + SQLMock sqlmock.Sqlmock +} + +func SetupMockStore(t *testing.T) (*MockedStore, func()) { mockDB, mock, err := sqlmock.New() if err != nil { t.Fatalf("Failed to create mock database: %v", err) @@ -29,7 +34,12 @@ func SetupMockStore(t *testing.T) (*database.DB, sqlmock.Sqlmock, func()) { t.Fatalf("Failed to open gorm connection: %v", err) } - return db, mock, func() { + mockedStore := &MockedStore{ + DB: db, + SQLMock: mock, + } + + return mockedStore, func() { mockDB.Close() } }