@@ -11,8 +11,9 @@ import (
1111 "sync"
1212 "time"
1313
14- "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
1514 "go.uber.org/zap"
15+
16+ "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
1617)
1718
1819// PersistenceManager handles message persistence and retry logic
@@ -59,7 +60,7 @@ func NewPersistenceManager(config PersistenceConfig, logger *zap.Logger, logsRec
5960}
6061
6162// StoreMessage stores a message in persistent storage (only for logs)
62- func (pm * PersistenceManager ) StoreMessage (ctx context.Context , data []byte , contentType , signalType string ) error {
63+ func (pm * PersistenceManager ) StoreMessage (_ context.Context , data []byte , contentType , signalType string ) error {
6364 if signalType != "logs" {
6465 pm .logger .Debug ("Skipping persistence for non-log signal type" ,
6566 zap .String ("signal_type" , signalType ))
@@ -91,7 +92,7 @@ func (pm *PersistenceManager) StoreMessage(ctx context.Context, data []byte, con
9192}
9293
9394// RemoveMessage removes a message from persistent storage
94- func (pm * PersistenceManager ) RemoveMessage (ctx context.Context , messageID , signalType string ) error {
95+ func (pm * PersistenceManager ) RemoveMessage (_ context.Context , messageID , signalType string ) error {
9596 key := fmt .Sprintf ("otlp_%s_%s" , signalType , messageID )
9697
9798 pm .storageMutex .Lock ()
@@ -106,7 +107,7 @@ func (pm *PersistenceManager) RemoveMessage(ctx context.Context, messageID, sign
106107}
107108
108109// ClearStoredMessages clears all stored messages for a specific signal type
109- func (pm * PersistenceManager ) ClearStoredMessages (ctx context.Context , signalType string ) error {
110+ func (pm * PersistenceManager ) ClearStoredMessages (_ context.Context , signalType string ) error {
110111 pm .storageMutex .Lock ()
111112 defer pm .storageMutex .Unlock ()
112113
@@ -131,7 +132,7 @@ func (pm *PersistenceManager) ClearStoredMessages(ctx context.Context, signalTyp
131132}
132133
133134// DeleteStoredMessageByContent deletes a stored message by matching content
134- func (pm * PersistenceManager ) DeleteStoredMessageByContent (ctx context.Context , content []byte , contentType , signalType string ) error {
135+ func (pm * PersistenceManager ) DeleteStoredMessageByContent (_ context.Context , content []byte , contentType , signalType string ) error {
135136 pm .storageMutex .Lock ()
136137 defer pm .storageMutex .Unlock ()
137138
@@ -167,7 +168,7 @@ func (pm *PersistenceManager) DeleteStoredMessageByContent(ctx context.Context,
167168}
168169
169170// GetStoredMessages retrieves all stored messages for retry
170- func (pm * PersistenceManager ) GetStoredMessages (ctx context.Context ) ([]PersistedMessage , error ) {
171+ func (pm * PersistenceManager ) GetStoredMessages (_ context.Context ) ([]PersistedMessage , error ) {
171172 pm .storageMutex .RLock ()
172173 defer pm .storageMutex .RUnlock ()
173174
@@ -289,11 +290,11 @@ func (pm *PersistenceManager) processRetries() {
289290 pm .logger .Debug ("Found log messages to retry" , zap .Int ("count" , len (logMessages )))
290291
291292 for _ , message := range logMessages {
292- pm .processMessageRetry (message )
293+ pm .processMessageRetry (pm . retryWorkerCtx , message )
293294 }
294295}
295296
296- func (pm * PersistenceManager ) processMessageRetry (message PersistedMessage ) {
297+ func (pm * PersistenceManager ) processMessageRetry (ctx context. Context , message PersistedMessage ) {
297298 if message .SignalType != "logs" {
298299 pm .logger .Debug ("Skipping retry for non-log message" ,
299300 zap .String ("signal_type" , message .SignalType ))
@@ -305,7 +306,7 @@ func (pm *PersistenceManager) processMessageRetry(message PersistedMessage) {
305306 zap .String ("signal_type" , message .SignalType ),
306307 zap .Int ("retry_count" , message .RetryCount + 1 ))
307308
308- success := pm .processStoredLogMessage (message )
309+ success := pm .processStoredLogMessage (ctx , message )
309310
310311 if success {
311312 pm .logger .Info ("Log message processed successfully, removing from storage" ,
@@ -333,7 +334,7 @@ func (pm *PersistenceManager) processMessageRetry(message PersistedMessage) {
333334}
334335
335336// processStoredLogMessage processes a stored log message
336- func (pm * PersistenceManager ) processStoredLogMessage (message PersistedMessage ) bool {
337+ func (pm * PersistenceManager ) processStoredLogMessage (ctx context. Context , message PersistedMessage ) bool {
337338 pm .logger .Debug ("Processing stored log message" ,
338339 zap .String ("message_id" , message .ID ),
339340 zap .String ("content_type" , message .ContentType ),
@@ -345,17 +346,15 @@ func (pm *PersistenceManager) processStoredLogMessage(message PersistedMessage)
345346 return false
346347 }
347348
348- ctx := context .Background ()
349-
350- encoder := getEncoderForContentType (message .ContentType )
351- if encoder == nil {
349+ enc := getEncoderForContentType (message .ContentType )
350+ if enc == nil {
352351 pm .logger .Error ("Unknown content type for stored message" ,
353352 zap .String ("message_id" , message .ID ),
354353 zap .String ("content_type" , message .ContentType ))
355354 return false
356355 }
357356
358- otlpReq , err := encoder .unmarshalLogsRequest (message .Data )
357+ otlpReq , err := enc .unmarshalLogsRequest (message .Data )
359358 if err != nil {
360359 pm .logger .Error ("Failed to unmarshal stored message" ,
361360 zap .String ("message_id" , message .ID ),
@@ -371,7 +370,7 @@ func (pm *PersistenceManager) processStoredLogMessage(message PersistedMessage)
371370 return false
372371 }
373372
374- _ , err = encoder .marshalLogsResponse (otlpResp )
373+ _ , err = enc .marshalLogsResponse (otlpResp )
375374 if err != nil {
376375 pm .logger .Error ("Failed to marshal response for stored message" ,
377376 zap .String ("message_id" , message .ID ),
0 commit comments