Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 102 additions & 23 deletions events-processor/config/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type ConsumerGroupConfig struct {
Topic string
ConsumerGroup string
ProcessRecords func([]*kgo.Record) []*kgo.Record
ProcessRecords func(context.Context, []*kgo.Record) []*kgo.Record
}

type TopicPartition struct {
Expand All @@ -34,17 +34,17 @@ type PartitionConsumer struct {
quit chan struct{}
done chan struct{}
records chan []*kgo.Record
processRecords func([]*kgo.Record) []*kgo.Record
processRecords func(context.Context, []*kgo.Record) []*kgo.Record
}

type ConsumerGroup struct {
consumers map[TopicPartition]*PartitionConsumer
client *kgo.Client
processRecords func([]*kgo.Record) []*kgo.Record
processRecords func(context.Context, []*kgo.Record) []*kgo.Record
logger *slog.Logger
}

func (pc *PartitionConsumer) consume() {
func (pc *PartitionConsumer) consume(ctx context.Context) {
defer close(pc.done)

pc.logger.Info(fmt.Sprintf("Starting consume for topic %s partition %d\n", pc.topic, pc.partition))
Expand All @@ -56,33 +56,40 @@ func (pc *PartitionConsumer) consume() {
pc.logger.Info("partition consumer quit")
return

case <-ctx.Done():
pc.logger.Info("partition consumer context canceled")
return

case records := <-pc.records:
ctx := context.Background()
span := tracer.GetTracerSpan(ctx, "post_process", "Consumer.Consume")
recordsAttr := attribute.Int("records.length", len(records))
span.SetAttributes(recordsAttr)
defer span.End()

processedRecords := pc.processRecords(records)
processedRecords := pc.processRecords(ctx, records)
commitableRecords := records

if len(processedRecords) != len(records) {
// Ensure we are not committing records that were not processed and can be re-consumed
record := findMaxCommitableRecord(processedRecords, records)
commitableRecords = []*kgo.Record{record}
return
}

err := pc.client.CommitRecords(ctx, commitableRecords...)
if err != nil {
if ctx.Err() != nil {
pc.logger.Info("Commit canceled due to shutdown")
return
}

pc.logger.Error(fmt.Sprintf("Error when committing offets to kafka. Error: %v topic: %s partition: %d offset: %d\n", err, pc.topic, pc.partition, records[len(records)-1].Offset+1))
utils.CaptureError(err)
}
}
}
}

func (cg *ConsumerGroup) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) {
func (cg *ConsumerGroup) assigned(ctx context.Context, cl *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned {
for _, partition := range partitions {
pc := &PartitionConsumer{
Expand All @@ -97,7 +104,7 @@ func (cg *ConsumerGroup) assigned(_ context.Context, cl *kgo.Client, assigned ma
processRecords: cg.processRecords,
}
cg.consumers[TopicPartition{topic: topic, partition: partition}] = pc
go pc.consume()
go pc.consume(ctx)
}
}
}
Expand All @@ -120,25 +127,73 @@ func (cg *ConsumerGroup) lost(_ context.Context, _ *kgo.Client, lost map[string]
}
}

func (cg *ConsumerGroup) poll() {
func (cg *ConsumerGroup) poll(ctx context.Context, done chan<- error) {
defer func() {
if r := recover(); r != nil {
cg.logger.Error("Consumer group poll panic", slog.Any("panic", r))
done <- fmt.Errorf("consumer group poll panic: %v", r)
}
}()

for {
fetches := cg.client.PollRecords(context.Background(), 10000)
if fetches.IsClientClosed() {
cg.logger.Info("client closed")
select {
case <-ctx.Done():
cg.logger.Info("Consumer group stopped")
return

default:
fetches := cg.client.PollRecords(ctx, 10000)
if fetches.IsClientClosed() {
cg.logger.Info("client closed")
return
}

if ctx.Err() != nil {
return
}

fetches.EachError(func(_ string, _ int32, err error) {
cg.logger.Error("Fetch error", slog.String("error", err.Error()))
done <- err
})

fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tp := TopicPartition{p.Topic, p.Partition}
if consumer, exists := cg.consumers[tp]; exists {
select {
case consumer.records <- p.Records:
case <-ctx.Done():
return
}
}
})

cg.client.AllowRebalance()
}
}
}

func (cg *ConsumerGroup) gracefulShutdown() {
var wg sync.WaitGroup

fetches.EachError(func(_ string, _ int32, err error) {
panic(err)
})
for tp, pc := range cg.consumers {
wg.Add(1)

fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tp := TopicPartition{p.Topic, p.Partition}
cg.consumers[tp].records <- p.Records
})
go func(tp TopicPartition, pc *PartitionConsumer) {
defer wg.Done()

cg.client.AllowRebalance()
cg.logger.Info("Shuting down partion consumer",
slog.String("topic", tp.topic),
slog.Int("partition", int(tp.partition)),
)

close(pc.quit)
<-pc.done
}(tp, pc)
}

wg.Wait()
cg.client.Close()
}

func NewConsumerGroup(serverConfig ServerConfig, cfg *ConsumerGroupConfig) (*ConsumerGroup, error) {
Expand Down Expand Up @@ -175,8 +230,32 @@ func NewConsumerGroup(serverConfig ServerConfig, cfg *ConsumerGroupConfig) (*Con
return cg, nil
}

func (cg *ConsumerGroup) Start() {
cg.poll()
func (cg *ConsumerGroup) Start(ctx context.Context) error {
pollCtx, cancel := context.WithCancel(ctx)
defer cancel()

done := make(chan error, 1)
go func() {
defer close(done)
cg.poll(pollCtx, done)
}()

select {
case <-ctx.Done():
cg.logger.Info("Gracefully shutting down consumer group")
cancel()

cg.gracefulShutdown()

cg.logger.Info("Consumer group shutdown is complete")
return ctx.Err()

case err := <-done:
if err != nil {
cg.logger.Error("Consumer group stopped with error", slog.String("error", err.Error()))
}
return err
}
}

func findMaxCommitableRecord(processedRecords []*kgo.Record, records []*kgo.Record) *kgo.Record {
Expand Down
90 changes: 82 additions & 8 deletions events-processor/processors/events_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ func NewEventProcessor(logger *slog.Logger, enrichmentService *EventEnrichmentSe
}
}

func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Record {
ctx := context.Background()
func (processor *EventProcessor) ProcessEvents(ctx context.Context, records []*kgo.Record) []*kgo.Record {
// Handle graceful shutdown
select {
case <-ctx.Done():
processor.logger.Info("Ongoing shutdown. Stop processing new events")
return nil
default:
}

span := tracer.GetTracerSpan(ctx, "post_process", "PostProcess.ProcessEvents")
recordsAttr := attribute.Int("records.length", len(records))
span.SetAttributes(recordsAttr)
Expand All @@ -43,13 +50,23 @@ func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Rec
wg := sync.WaitGroup{}
wg.Add(len(records))

producersWg := sync.WaitGroup{}

var mu sync.Mutex
processedRecords := make([]*kgo.Record, 0)

for _, record := range records {
go func(record *kgo.Record) {
defer wg.Done()

// Check if a shutdown process is ongoing
select {
case <-ctx.Done():
processor.logger.Info("Ongoing shutdown. Stop processing event")
return
default:
}

sp := tracer.GetTracerSpan(ctx, "post_process", "PostProcess.ProcessOneEvent")
defer sp.End()

Expand All @@ -66,7 +83,7 @@ func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Rec
return
}

result := processor.processEvent(ctx, &event)
result := processor.processEvent(ctx, &event, &producersWg)
if result.Failure() {
processor.logger.Error(
result.ErrorMessage(),
Expand All @@ -86,7 +103,11 @@ func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Rec
}

// Push failed records to the dead letter queue
go processor.ProducerService.ProduceToDeadLetterQueue(ctx, event, result)
producersWg.Add(1)
go func() {
defer producersWg.Done()
processor.ProducerService.ProduceToDeadLetterQueue(ctx, event, result)
}()
}

// Track processed records
Expand All @@ -98,10 +119,13 @@ func (processor *EventProcessor) ProcessEvents(records []*kgo.Record) []*kgo.Rec

wg.Wait()

// Wait for all producers routines to complete.
processor.waitForProducers(ctx, &producersWg)

return processedRecords
}

func (processor *EventProcessor) processEvent(ctx context.Context, event *models.Event) utils.Result[*models.EnrichedEvent] {
func (processor *EventProcessor) processEvent(ctx context.Context, event *models.Event, producersWg *sync.WaitGroup) utils.Result[*models.EnrichedEvent] {
enrichedEventResult := processor.EnrichmentService.EnrichEvent(event)
if enrichedEventResult.Failure() {
return failedResult(enrichedEventResult, enrichedEventResult.ErrorCode(), enrichedEventResult.ErrorMessage())
Expand All @@ -110,11 +134,23 @@ func (processor *EventProcessor) processEvent(ctx context.Context, event *models
enrichedEvents := enrichedEventResult.Value()
enrichedEvent := enrichedEvents[0]

go processor.ProducerService.ProduceEnrichedEvent(ctx, enrichedEvent)
processor.trackProducer(
ctx,
producersWg,
func() {
processor.ProducerService.ProduceEnrichedEvent(ctx, enrichedEvent)
},
)

// TODO(pre-aggregation): Uncomment to enable the feature
// for _, ev := range enrichedEvents {
// go processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
// processor.trackProducer(
// ctx,
// producersWg,
// func() {
// processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
// },
// )
// }

if enrichedEvent.Subscription != nil && event.NotAPIPostProcessed() {
Expand All @@ -127,7 +163,13 @@ func (processor *EventProcessor) processEvent(ctx context.Context, event *models
}

if payInAdvance {
go processor.ProducerService.ProduceChargedInAdvanceEvent(ctx, enrichedEvent)
processor.trackProducer(
ctx,
producersWg,
func() {
processor.ProducerService.ProduceChargedInAdvanceEvent(ctx, enrichedEvent)
},
)
}

flagResult := processor.RefreshService.FlagSubscriptionRefresh(enrichedEvent)
Expand All @@ -142,6 +184,38 @@ func (processor *EventProcessor) processEvent(ctx context.Context, event *models
return utils.SuccessResult(enrichedEvent)
}

func (processor *EventProcessor) trackProducer(ctx context.Context, producerWg *sync.WaitGroup, routine func()) {
producerWg.Add(1)
go func() {
defer producerWg.Done()

select {
case <-ctx.Done():
processor.logger.Debug("Shutdown signal received, skipping producer")
return
default:
routine()
}
}()
}

func (processor *EventProcessor) waitForProducers(ctx context.Context, producersWg *sync.WaitGroup) {
done := make(chan struct{})
go func() {
producersWg.Wait()
close(done)
}()

select {
case <-done:
processor.logger.Debug("All producer goroutines completed successfully")
case <-time.After(30 * time.Second): // Configurable timeout
processor.logger.Warn("Timeout waiting for producer goroutines to complete")
case <-ctx.Done():
processor.logger.Info("Shutdown signal received while waiting for producer goroutines")
}
}

func failedResult(r utils.AnyResult, code string, message string) utils.Result[*models.EnrichedEvent] {
result := utils.FailedResult[*models.EnrichedEvent](r.Error()).AddErrorDetails(code, message)
result.Retryable = r.IsRetryable()
Expand Down
Loading
Loading