Skip to content

Commit

Permalink
fix: flaky semaphore test (#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs authored Nov 7, 2024
1 parent 46f8e38 commit 0aafd75
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 24 deletions.
24 changes: 16 additions & 8 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,34 +225,42 @@ func TestConsumerConsumeLogs(t *testing.T) {

func TestConsumeLogsSemaphore(t *testing.T) {
logs := plog.NewLogs()
var batches []*modelpb.Batch

doneCh := make(chan struct{})
recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
// Ensure channel is only closed the first time
doneCh <- struct{}{}
doneCh <- struct{}{}
return nil
})
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: recorder,
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-doneCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeLogsWithResult(ctx, logs)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
<-doneCh

// Turn channel into sink.
// This trick gets rid of using sync.Once.
doneCh = make(chan struct{}, 2)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}
Expand Down
24 changes: 16 additions & 8 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,34 +227,42 @@ func TestConsumeMetrics(t *testing.T) {

func TestConsumeMetricsSemaphore(t *testing.T) {
metrics := pmetric.NewMetrics()
var batches []*modelpb.Batch

doneCh := make(chan struct{})
recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
// Ensure channel is only closed the first time
doneCh <- struct{}{}
doneCh <- struct{}{}
return nil
})
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: recorder,
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-doneCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeMetricsWithResult(ctx, metrics)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
<-doneCh

// Turn channel into sink.
// This trick gets rid of using sync.Once.
doneCh = make(chan struct{}, 2)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}
Expand Down
24 changes: 16 additions & 8 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,34 +1166,42 @@ func TestSpanLinks(t *testing.T) {

func TestConsumeTracesSemaphore(t *testing.T) {
traces := ptrace.NewTraces()
var batches []*modelpb.Batch

doneCh := make(chan struct{})
recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
// Ensure channel is only closed the first time
doneCh <- struct{}{}
doneCh <- struct{}{}
return nil
})
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: recorder,
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-doneCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeTracesWithResult(ctx, traces)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
<-doneCh

// Turn channel into sink.
// This trick gets rid of using sync.Once.
doneCh = make(chan struct{}, 2)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}
Expand Down

0 comments on commit 0aafd75

Please sign in to comment.