diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 79cb5b92b7..26d7a172bf 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -30,6 +30,7 @@ body: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml index 11b6f1362d..e48fec3575 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -30,6 +30,7 @@ body: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/.github/codecov.yml b/.github/codecov.yml index 0d3bda059b..a8f7fd2412 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -63,6 +63,11 @@ flags: paths: - src/OpenTelemetry.Instrumentation.AspNetCore + unittests-Instrumentation.ConfluentKafka: + carryforward: true + paths: + - src/OpenTelemetry.Instrumentation.ConfluentKafka + unittests-Instrumentation.EventCounters: carryforward: true paths: diff --git a/.github/component_owners.yml b/.github/component_owners.yml index b7c252688e..d4a5aa9ca3 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -35,6 +35,8 @@ components: - ppittle src/OpenTelemetry.Instrumentation.Cassandra/: - xsoheilalizadeh + src/OpenTelemetry.Instrumentation.ConfluentKafka/: + - g7ed6e src/OpenTelemetry.Instrumentation.ElasticsearchClient/: - ejsmith src/OpenTelemetry.Instrumentation.EventCounters/: @@ -130,6 +132,8 @@ components: - ppittle test/OpenTelemetry.Instrumentation.Cassandra.Tests/: - xsoheilalizadeh + test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/: + - g7ed6e test/OpenTelemetry.Instrumentation.ElasticsearchClient.Tests/: - ejsmith test/OpenTelemetry.Instrumentation.EventCounters.Tests/: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ceea50c2e..1f0778213e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: instrumentation-aws: ['*/OpenTelemetry.Instrumentation.AWS/**', '*/OpenTelemetry.Instrumentation.AWS.Tests/**', '!**/*.md'] instrumentation-aws-lambda: ['*/OpenTelemetry.Instrumentation.AWSLambda/**', '*/OpenTelemetry.Instrumentation.AWSLambda.Tests/**', '!**/*.md'] instrumentation-cassandra: ['*/OpenTelemetry.Instrumentation.Cassandra*/**', '!**/*.md'] + instrumentation-confluentkafka: ['*/OpenTelemetry.Instrumentation.ConfluentKafka*/**', 'examples/kafka/**', '!**/*.md'] instrumentation-elasticsearchclient: ['*/OpenTelemetry.Instrumentation.ElasticsearchClient*/**', '!**/*.md'] instrumentation-entityframeworkcore: ['*/OpenTelemetry.Instrumentation.EntityFrameworkCore*/**', '!**/*.md'] instrumentation-eventcounters: ['*/OpenTelemetry.Instrumentation.EventCounters*/**', 'examples/event-counters/**', '!**/*.md'] @@ -222,6 +223,27 @@ jobs: project-name: Component[OpenTelemetry.Instrumentation.Cassandra] code-cov-name: Instrumentation.Cassandra + build-test-instrumentation-confluentkafka: + needs: detect-changes + if: | + contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') + || contains(needs.detect-changes.outputs.changes, 'build') + || contains(needs.detect-changes.outputs.changes, 'shared') + uses: ./.github/workflows/Component.BuildTest.yml + with: + project-name: Component[OpenTelemetry.Instrumentation.ConfluentKafka] + code-cov-name: Instrumentation.ConfluentKafka + + build-test-instrumentation-confluentkafka-integration: + needs: detect-changes + if: | + contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') + || contains(needs.detect-changes.outputs.changes, 'build') + || contains(needs.detect-changes.outputs.changes, 'shared') + uses: ./.github/workflows/integration.yml + with: + job: kafka-integration-test + build-test-instrumentation-elasticsearchclient: needs: detect-changes if: | @@ -375,6 +397,8 @@ jobs: || contains(needs.detect-changes.outputs.changes, 'build') || contains(needs.detect-changes.outputs.changes, 'shared') uses: ./.github/workflows/integration.yml + with: + job: redis-integration-test build-test-instrumentation-wcf: needs: detect-changes @@ -518,6 +542,7 @@ jobs: || contains(needs.detect-changes.outputs.changes, 'instrumentation-aspnetcore') || contains(needs.detect-changes.outputs.changes, 'instrumentation-aws') || contains(needs.detect-changes.outputs.changes, 'instrumentation-awslambda') + || contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') || contains(needs.detect-changes.outputs.changes, 'instrumentation-eventcounters') || contains(needs.detect-changes.outputs.changes, 'instrumentation-grpcnetclient') || contains(needs.detect-changes.outputs.changes, 'instrumentation-http') @@ -556,6 +581,7 @@ jobs: build-test-instrumentation-aws, build-test-instrumentation-awslambda, build-test-instrumentation-cassandra, + build-test-instrumentation-confluentkafka, build-test-instrumentation-elasticsearchclient, build-test-instrumentation-entityframeworkcore, build-test-instrumentation-eventcounters, diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 17fc8a9da9..d139ad213b 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -2,9 +2,14 @@ name: Integration Build OpenTelemetry.Instrumentation.StackExchangeRedis on: workflow_call: + inputs: + job: + required: true + type: string jobs: redis-integration-test: + if: inputs.job == 'all' || inputs.job == 'redis-integration-test' runs-on: ubuntu-latest strategy: fail-fast: false @@ -13,5 +18,18 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Run redis docker-compose.integration + - name: Run redis docker-compose run: docker-compose --file=test/OpenTelemetry.Instrumentation.StackExchangeRedis.Tests/docker-compose.yml --file=build/docker-compose.${{ matrix.version }}.yml --project-directory=. up --exit-code-from=tests --build + + kafka-integration-test: + if: inputs.job == 'all' || inputs.job == 'kafka-integration-test' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + version: [net6.0, net7.0, net8.0] + steps: + - uses: actions/checkout@v4 + + - name: Run kafka docker-compose + run: docker-compose --file=test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml --file=build/docker-compose.${{ matrix.version }}.yml --project-directory=. up --exit-code-from=tests --build diff --git a/.github/workflows/prepare-release.yml b/.github/workflows/prepare-release.yml index f855656280..b0673ac73a 100644 --- a/.github/workflows/prepare-release.yml +++ b/.github/workflows/prepare-release.yml @@ -20,6 +20,7 @@ on: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/build/Common.props b/build/Common.props index 5fc436b894..3cf54473e1 100644 --- a/build/Common.props +++ b/build/Common.props @@ -43,6 +43,7 @@ [1.9.0,2.0) [1.9.0-rc.1] [2.1.58,3.0) + [2.3.0,3.0) [3.16.0,4.0) [1.2.0-beta.507,2.0) [4.3.4,) diff --git a/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj new file mode 100644 index 0000000000..0d0a145639 --- /dev/null +++ b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj @@ -0,0 +1,34 @@ + + + + $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName) + Instrumentation.ConfluentKafka- + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/kafka/Constants.cs b/examples/kafka/Constants.cs new file mode 100644 index 0000000000..af3cf026c4 --- /dev/null +++ b/examples/kafka/Constants.cs @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace Examples.ConfluentKafka; + +public static class Constants +{ + public static readonly string Topic = $"test-topic-{Guid.NewGuid()}"; +} diff --git a/examples/kafka/Examples.ConfluentKafka.csproj b/examples/kafka/Examples.ConfluentKafka.csproj new file mode 100644 index 0000000000..b9ce8848cb --- /dev/null +++ b/examples/kafka/Examples.ConfluentKafka.csproj @@ -0,0 +1,17 @@ + + + Exe + net8.0 + enable + enable + + + + + + + + + + + diff --git a/examples/kafka/ProduceConsumeHostedService.cs b/examples/kafka/ProduceConsumeHostedService.cs new file mode 100644 index 0000000000..04d14dd893 --- /dev/null +++ b/examples/kafka/ProduceConsumeHostedService.cs @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using OpenTelemetry.Instrumentation.ConfluentKafka; + +namespace Examples.ConfluentKafka; + +public class ProduceConsumeHostedService( + InstrumentedProducerBuilder instrumentedProducerBuilder, + InstrumentedConsumerBuilder instrumentedConsumerBuilder) + : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + IProducer producer = instrumentedProducerBuilder.Build(); + IConsumer consumer = instrumentedConsumerBuilder.Build(); + + for (int j = 0; j < 100; j++) + { + await producer.ProduceAsync( + Constants.Topic, + new Message { Key = "any_key", Value = $"any_value_{j}" }, + stoppingToken); + } + + for (int j = 0; j < 100; j++) + { + producer.Produce( + Constants.Topic, + new Message { Key = "any_key", Value = $"any_value_{j}" }); + } + + producer.Flush(stoppingToken); + + consumer.Subscribe(Constants.Topic); + while (!stoppingToken.IsCancellationRequested) + { + ConsumeResult consumeResult = consumer.Consume(stoppingToken); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + + Console.WriteLine($"Consumer {consumer.Name} received message: {consumeResult.Message.Value}"); + } + } +} diff --git a/examples/kafka/Program.cs b/examples/kafka/Program.cs new file mode 100644 index 0000000000..fe9c6dbcdd --- /dev/null +++ b/examples/kafka/Program.cs @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Examples.ConfluentKafka; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +var builder = Host.CreateApplicationBuilder(args); + +const string bootstrapServers = "localhost:9092"; + +builder.Services.AddSingleton(_ => +{ + ProducerConfig producerConfig = new() { BootstrapServers = bootstrapServers }; + return new InstrumentedProducerBuilder(producerConfig); +}); +builder.Services.AddSingleton(_ => +{ + ConsumerConfig consumerConfigA = new() + { + BootstrapServers = bootstrapServers, + GroupId = "group-a", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + return new InstrumentedConsumerBuilder(consumerConfigA); +}); + +builder.Services.AddOpenTelemetry() + .WithTracing(tracing => + { + tracing.AddConsoleExporter() + .AddOtlpExporter() + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }) + .WithMetrics(metering => + { + metering.AddConsoleExporter() + .AddOtlpExporter() + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + +builder.Services.AddHostedService(); + +var app = builder.Build(); +await app.RunAsync(); diff --git a/examples/kafka/README.md b/examples/kafka/README.md new file mode 100644 index 0000000000..02f594480e --- /dev/null +++ b/examples/kafka/README.md @@ -0,0 +1,13 @@ +# Run Examples.ConfluentKafka + +Start the Confluent Kafka stack: + +```cmd +docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local +``` + +Start the Aspire Dashboard: + +```cmd +docker run --rm -it -p 18888:18888 -p 4317:18889 -d --name aspire-dashboard mcr.microsoft.com/dotnet/nightly/aspire-dashboard:8.0.0 +``` diff --git a/opentelemetry-dotnet-contrib.sln b/opentelemetry-dotnet-contrib.sln index f0a4560077..f7508674db 100644 --- a/opentelemetry-dotnet-contrib.sln +++ b/opentelemetry-dotnet-contrib.sln @@ -287,9 +287,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Projects", "Projects", "{04 ProjectSection(SolutionItems) = preProject build\Projects\Component.proj = build\Projects\Component.proj build\Projects\OpenTelemetry.Contrib.Shared.Tests.proj = build\Projects\OpenTelemetry.Contrib.Shared.Tests.proj + build\Projects\OpenTelemetry.Exporter.InfluxDB.proj = build\Projects\OpenTelemetry.Exporter.InfluxDB.proj + build\Projects\OpenTelemetry.Extensions.Enrichment.proj = build\Projects\OpenTelemetry.Extensions.Enrichment.proj build\Projects\OpenTelemetry.Instrumentation.AspNet.proj = build\Projects\OpenTelemetry.Instrumentation.AspNet.proj build\Projects\OpenTelemetry.Instrumentation.AspNetCore.proj = build\Projects\OpenTelemetry.Instrumentation.AspNetCore.proj + build\Projects\OpenTelemetry.Instrumentation.ConfluentKafka.proj = build\Projects\OpenTelemetry.Instrumentation.ConfluentKafka.proj build\Projects\OpenTelemetry.Instrumentation.EventCounters.proj = build\Projects\OpenTelemetry.Instrumentation.EventCounters.proj + build\Projects\OpenTelemetry.Instrumentation.GrpcCore.proj = build\Projects\OpenTelemetry.Instrumentation.GrpcCore.proj build\Projects\OpenTelemetry.Instrumentation.Owin.proj = build\Projects\OpenTelemetry.Instrumentation.Owin.proj build\Projects\OpenTelemetry.Instrumentation.Process.proj = build\Projects\OpenTelemetry.Instrumentation.Process.proj build\Projects\OpenTelemetry.Instrumentation.Runtime.proj = build\Projects\OpenTelemetry.Instrumentation.Runtime.proj @@ -377,6 +381,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ISSUE_TEMPLATE", "ISSUE_TEM .github\ISSUE_TEMPLATE\feature_request.yml = .github\ISSUE_TEMPLATE\feature_request.yml EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{3A464E7A-42F3-44B0-B8D7-80521A7704A6}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Examples.ConfluentKafka", "examples\kafka\Examples.ConfluentKafka.csproj", "{9B994669-E839-4C42-A0F1-DF9DD058C1DC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -779,6 +791,18 @@ Global {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Debug|Any CPU.Build.0 = Debug|Any CPU {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Release|Any CPU.ActiveCfg = Release|Any CPU {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Release|Any CPU.Build.0 = Release|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.Build.0 = Debug|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.ActiveCfg = Release|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.Build.0 = Release|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.Build.0 = Release|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -899,6 +923,10 @@ Global {70CA77D4-5D7F-4D70-A6B5-8AAC07A8EA3C} = {2097345F-4DD3-477D-BC54-A922F9B2B402} {45D29DAA-0DB9-4808-B879-1AECC37EF366} = {824BD1DE-3FA8-4FE0-823A-FD365EAC78AF} {40373C78-0513-4067-A96B-96A851369761} = {1A06E14B-DD2F-4536-9D2E-F708C0C43555} + {96341E23-990E-4144-A7E3-9EF0DAFF3232} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63} + {BE40900A-2859-471D-8802-21DFD73DDAA7} = {2097345F-4DD3-477D-BC54-A922F9B2B402} + {3A464E7A-42F3-44B0-B8D7-80521A7704A6} = {B75EE478-97F7-4E9F-9A5A-DB3D0988EDEA} + {9B994669-E839-4C42-A0F1-DF9DD058C1DC} = {3A464E7A-42F3-44B0-B8D7-80521A7704A6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B0816796-CDB3-47D7-8C3C-946434DE3B66} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt new file mode 100644 index 0000000000..074c6ad103 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt @@ -0,0 +1,2 @@ +#nullable enable + diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt new file mode 100644 index 0000000000..d27211c49c --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt @@ -0,0 +1,20 @@ +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable>! config) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable>! config) -> void +OpenTelemetry.Metrics.MeterProviderBuilderExtensions +OpenTelemetry.Trace.TracerProviderBuilderExtensions +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.Build() -> Confluent.Kafka.IConsumer! +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs new file mode 100644 index 0000000000..9bb32bb7a6 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ConfluentKafka.Tests" + AssemblyInfo.PublicKey)] + +#if SIGNED +internal static class AssemblyInfo +{ + public const string PublicKey = ", PublicKey=002400000480000094000000060200000024000052534131000400000100010051C1562A090FB0C9F391012A32198B5E5D9A60E9B80FA2D7B434C9E5CCB7259BD606E66F9660676AFC6692B8CDC6793D190904551D2103B7B22FA636DCBB8208839785BA402EA08FC00C8F1500CCEF28BBF599AA64FFB1E1D5DC1BF3420A3777BADFE697856E9D52070A50C3EA5821C80BEF17CA3ACFFA28F89DD413F096F898"; +} +#else +internal static class AssemblyInfo +{ + public const string PublicKey = ""; +} +#endif diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md new file mode 100644 index 0000000000..134621e04d --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +* Initial release diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs new file mode 100644 index 0000000000..beb4c4812d --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Diagnostics.Metrics; +using OpenTelemetry.Internal; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal static class ConfluentKafkaCommon +{ + internal static readonly string InstrumentationName = typeof(ConfluentKafkaCommon).Assembly.GetName().Name!; + internal static readonly string InstrumentationVersion = typeof(ConfluentKafkaCommon).Assembly.GetPackageVersion(); + internal static readonly ActivitySource ActivitySource = new(InstrumentationName, InstrumentationVersion); + internal static readonly Meter Meter = new(InstrumentationName, InstrumentationVersion); + internal static readonly Counter ReceiveMessagesCounter = Meter.CreateCounter(SemanticConventions.MetricMessagingReceiveMessages); + internal static readonly Histogram ReceiveDurationHistogram = Meter.CreateHistogram(SemanticConventions.MetricMessagingReceiveDuration); + internal static readonly Counter PublishMessagesCounter = Meter.CreateCounter(SemanticConventions.MetricMessagingPublishMessages); + internal static readonly Histogram PublishDurationHistogram = Meter.CreateHistogram(SemanticConventions.MetricMessagingPublishDuration); +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs new file mode 100644 index 0000000000..573b78d56b --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaConsumerInstrumentation; + +#pragma warning disable SA1402 // File may only contain a single type +internal sealed class ConfluentKafkaConsumerInstrumentation : ConfluentKafkaConsumerInstrumentation +#pragma warning restore SA1402 // File may only contain a single type +{ + public ConfluentKafkaConsumerInstrumentation(InstrumentedConsumerBuilder consumerBuilder) + { + this.ConsumerBuilder = consumerBuilder; + } + + internal InstrumentedConsumerBuilder ConsumerBuilder { get; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs new file mode 100644 index 0000000000..c2a108c100 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs @@ -0,0 +1,11 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaConsumerInstrumentationOptions +{ + public bool Metrics { get; set; } + + public bool Traces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs new file mode 100644 index 0000000000..13f71b1905 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal abstract class ConfluentKafkaProducerInstrumentation; + +#pragma warning disable SA1402 // File may only contain a single type +internal sealed class ConfluentKafkaProducerInstrumentation : ConfluentKafkaProducerInstrumentation +#pragma warning restore SA1402 // File may only contain a single type +{ + public ConfluentKafkaProducerInstrumentation(InstrumentedProducerBuilder producerBuilder) + { + this.ProducerBuilder = producerBuilder; + } + + internal InstrumentedProducerBuilder ProducerBuilder { get; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs new file mode 100644 index 0000000000..cb97821416 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs @@ -0,0 +1,11 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaProducerInstrumentationOptions +{ + public bool Metrics { get; set; } + + public bool Traces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs new file mode 100644 index 0000000000..8fa5fa8b02 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -0,0 +1,404 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text; +using Confluent.Kafka; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class InstrumentedConsumer : IConsumer +{ + private const string ReceiveOperationName = "receive"; + private const string KafkaMessagingSystem = "kafka"; + private readonly IConsumer consumer; + private readonly ConfluentKafkaConsumerInstrumentationOptions options; + + public InstrumentedConsumer(IConsumer consumer, ConfluentKafkaConsumerInstrumentationOptions options) + { + this.consumer = consumer; + this.options = options; + } + + public Handle Handle => this.consumer.Handle; + + public string Name => this.consumer.Name; + + public string MemberId => this.consumer.MemberId; + + public List Assignment => this.consumer.Assignment; + + public List Subscription => this.consumer.Subscription; + + public IConsumerGroupMetadata ConsumerGroupMetadata => this.consumer.ConsumerGroupMetadata; + + public string? GroupId { get; internal set; } + + public void Dispose() + { + this.consumer.Dispose(); + } + + public int AddBrokers(string brokers) + { + return this.consumer.AddBrokers(brokers); + } + + public void SetSaslCredentials(string username, string password) + { + this.consumer.SetSaslCredentials(username, password); + } + + public ConsumeResult? Consume(int millisecondsTimeout) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; + try + { + result = this.consumer.Consume(millisecondsTimeout); + consumeResult = ExtractConsumeResult(result); + return result; + } + catch (ConsumeException e) + { + (consumeResult, errorType) = ExtractConsumeResult(e); + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } + } + + public ConsumeResult? Consume(CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; + try + { + result = this.consumer.Consume(cancellationToken); + consumeResult = ExtractConsumeResult(result); + return result; + } + catch (ConsumeException e) + { + (consumeResult, errorType) = ExtractConsumeResult(e); + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } + } + + public ConsumeResult? Consume(TimeSpan timeout) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; + try + { + result = this.consumer.Consume(timeout); + consumeResult = ExtractConsumeResult(result); + return result; + } + catch (ConsumeException e) + { + (consumeResult, errorType) = ExtractConsumeResult(e); + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } + } + + public void Subscribe(IEnumerable topics) + { + this.consumer.Subscribe(topics); + } + + public void Subscribe(string topic) + { + this.consumer.Subscribe(topic); + } + + public void Unsubscribe() + { + this.consumer.Unsubscribe(); + } + + public void Assign(TopicPartition partition) + { + this.consumer.Assign(partition); + } + + public void Assign(TopicPartitionOffset partition) + { + this.consumer.Assign(partition); + } + + public void Assign(IEnumerable partitions) + { + this.consumer.Assign(partitions); + } + + public void Assign(IEnumerable partitions) + { + this.consumer.Assign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumer.IncrementalAssign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumer.IncrementalAssign(partitions); + } + + public void IncrementalUnassign(IEnumerable partitions) + { + this.consumer.IncrementalUnassign(partitions); + } + + public void Unassign() + { + this.consumer.Unassign(); + } + + public void StoreOffset(ConsumeResult result) + { + this.consumer.StoreOffset(result); + } + + public void StoreOffset(TopicPartitionOffset offset) + { + this.consumer.StoreOffset(offset); + } + + public List Commit() + { + return this.consumer.Commit(); + } + + public void Commit(IEnumerable offsets) + { + this.consumer.Commit(offsets); + } + + public void Commit(ConsumeResult result) + { + this.consumer.Commit(result); + } + + public void Seek(TopicPartitionOffset tpo) + { + this.consumer.Seek(tpo); + } + + public void Pause(IEnumerable partitions) + { + this.consumer.Pause(partitions); + } + + public void Resume(IEnumerable partitions) + { + this.consumer.Resume(partitions); + } + + public List Committed(TimeSpan timeout) + { + return this.consumer.Committed(timeout); + } + + public List Committed(IEnumerable partitions, TimeSpan timeout) + { + return this.consumer.Committed(partitions, timeout); + } + + public Offset Position(TopicPartition partition) + { + return this.consumer.Position(partition); + } + + public List OffsetsForTimes(IEnumerable timestampsToSearch, TimeSpan timeout) + { + return this.consumer.OffsetsForTimes(timestampsToSearch, timeout); + } + + public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) + { + return this.consumer.GetWatermarkOffsets(topicPartition); + } + + public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) + { + return this.consumer.QueryWatermarkOffsets(topicPartition, timeout); + } + + public void Close() + { + this.consumer.Close(); + } + + private static string FormatConsumeException(ConsumeException consumeException) => + $"ConsumeException: {consumeException.Error}"; + + private static PropagationContext ExtractPropagationContext(Headers? headers) + => Propagators.DefaultTextMapPropagator.Extract(default, headers, ExtractTraceContext); + + private static IEnumerable ExtractTraceContext(Headers? headers, string value) + { + if (headers?.TryGetLastBytes(value, out var bytes) == true) + { + yield return Encoding.UTF8.GetString(bytes); + } + } + + private static ConsumeResult ExtractConsumeResult(ConsumeResult result) => result switch + { + null => new ConsumeResult(null, null), + { Message: null } => new ConsumeResult(result.TopicPartitionOffset, null), + _ => new ConsumeResult(result.TopicPartitionOffset, result.Message.Headers, result.Message.Key), + }; + + private static (ConsumeResult ConsumeResult, string ErrorType) ExtractConsumeResult(ConsumeException exception) => exception switch + { + { ConsumerRecord: null } => (new ConsumeResult(null, null), FormatConsumeException(exception)), + { ConsumerRecord.Message: null } => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, null), FormatConsumeException(exception)), + _ => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key), FormatConsumeException(exception)), + }; + + private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null) + { + tags = new TagList() + { + new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + ReceiveOperationName), + new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem), + new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic), + }; + + if (partition is not null) + { + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition)); + } + + if (errorType is not null) + { + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType)); + } + } + + private static void RecordReceive(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + { + GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType); + + ConfluentKafkaCommon.ReceiveMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.ReceiveDurationHistogram.Record(duration.TotalSeconds, in tags); + } + + private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endTime, ConsumeResult consumeResult, string? errorType) + { + if (this.options.Traces) + { + PropagationContext propagationContext = consumeResult.Headers != null + ? ExtractPropagationContext(consumeResult.Headers) + : default; + + using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); + if (activity != null) + { + if (errorType != null) + { + activity.SetStatus(ActivityStatusCode.Error); + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeErrorType, errorType); + } + } + + activity.SetEndTime(endTime.UtcDateTime); + } + } + + if (this.options.Metrics) + { + TimeSpan duration = endTime - startTime; + RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType); + } + } + + private Activity? StartReceiveActivity(PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key) + { + var spanName = string.IsNullOrEmpty(topicPartitionOffset?.Topic) + ? ReceiveOperationName + : string.Concat(topicPartitionOffset!.Topic, " ", ReceiveOperationName); + + ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid() + ? new[] { new ActivityLink(propagationContext.ActivityContext) } + : Array.Empty(); + + Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default); + if (activity?.IsAllDataRequested == true) + { + activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem); + activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name); + activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topicPartitionOffset?.Topic); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartitionOffset?.Partition.Value); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageOffset, topicPartitionOffset?.Offset.Value); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaConsumerGroup, this.GroupId); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, ReceiveOperationName); + if (key != null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, key); + } + } + + return activity; + } + + private readonly record struct ConsumeResult( + TopicPartitionOffset? TopicPartitionOffset, + Headers? Headers, + object? Key = null) + { + public object? Key { get; } = Key; + + public Headers? Headers { get; } = Headers; + + public TopicPartitionOffset? TopicPartitionOffset { get; } = TopicPartitionOffset; + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs new file mode 100644 index 0000000000..93268ba5c7 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +/// +/// A builder of with support for instrumentation. +/// +/// Type of the key. +/// Type of value. +public sealed class InstrumentedConsumerBuilder : ConsumerBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified. + public InstrumentedConsumerBuilder(IEnumerable> config) + : base(config) + { + } + + internal ConfluentKafkaConsumerInstrumentationOptions? Options { get; set; } + + /// + /// Build a new IConsumer instance. + /// + /// an . + public override IConsumer Build() + { + Debug.Assert(this.Options != null, "Options should not be null."); + + ConsumerConfig config = (ConsumerConfig)this.Config; + if (this.Options!.Metrics) + { + config.StatisticsIntervalMs ??= 1000; + } + + var consumer = new InstrumentedConsumer(base.Build(), this.Options); + consumer.GroupId = config.GroupId; + + return consumer; + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs new file mode 100644 index 0000000000..aa7d31ba48 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -0,0 +1,370 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text; +using Confluent.Kafka; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal sealed class InstrumentedProducer : IProducer +{ + private const string PublishOperationName = "publish"; + private const string KafkaMessagingSystem = "kafka"; + + private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator; + private readonly IProducer producer; + private readonly ConfluentKafkaProducerInstrumentationOptions options; + + public InstrumentedProducer( + IProducer producer, + ConfluentKafkaProducerInstrumentationOptions options) + { + this.producer = producer; + this.options = options; + } + + public Handle Handle => this.producer.Handle; + + public string Name => this.producer.Name; + + internal ConfluentKafkaProducerInstrumentationOptions Options => this.options; + + public int AddBrokers(string brokers) + { + return this.producer.AddBrokers(brokers); + } + + public void SetSaslCredentials(string username, string password) + { + this.producer.SetSaslCredentials(username, password); + } + + public async Task> ProduceAsync( + string topic, + Message message, + CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartPublishActivity(start, topic, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + DeliveryResult result; + string? errorType = null; + try + { + result = await this.producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; + + if (this.options.Metrics) + { + RecordPublish(topic, duration, errorType); + } + } + + return result; + } + + public async Task> ProduceAsync( + TopicPartition topicPartition, + Message message, + CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + DeliveryResult result; + string? errorType = null; + try + { + result = await this.producer.ProduceAsync(topicPartition, message, cancellationToken).ConfigureAwait(false); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; + + if (this.options.Metrics) + { + RecordPublish(topicPartition, duration, errorType); + } + } + + return result; + } + + public void Produce(string topic, Message message, Action>? deliveryHandler = null) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartPublishActivity(start, topic, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + string? errorType = null; + try + { + this.producer.Produce(topic, message, deliveryHandler); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; + + if (this.options.Metrics) + { + RecordPublish(topic, duration, errorType); + } + } + } + + public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + string? errorType = null; + try + { + this.producer.Produce(topicPartition, message, deliveryHandler); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; + + if (this.options.Metrics) + { + RecordPublish(topicPartition, duration, errorType); + } + } + } + + public int Poll(TimeSpan timeout) + { + return this.producer.Poll(timeout); + } + + public int Flush(TimeSpan timeout) + { + return this.producer.Flush(timeout); + } + + public void Flush(CancellationToken cancellationToken = default) + { + this.producer.Flush(cancellationToken); + } + + public void InitTransactions(TimeSpan timeout) + { + this.producer.InitTransactions(timeout); + } + + public void BeginTransaction() + { + this.producer.BeginTransaction(); + } + + public void CommitTransaction(TimeSpan timeout) + { + this.producer.CommitTransaction(timeout); + } + + public void CommitTransaction() + { + this.producer.CommitTransaction(); + } + + public void AbortTransaction(TimeSpan timeout) + { + this.producer.AbortTransaction(timeout); + } + + public void AbortTransaction() + { + this.producer.AbortTransaction(); + } + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) + { + this.producer.SendOffsetsToTransaction(offsets, groupMetadata, timeout); + } + + public void Dispose() + { + this.producer.Dispose(); + } + + private static string FormatProduceException(ProduceException produceException) => + $"ProduceException: {produceException.Error.Code}"; + + private static string FormatArgumentException(ArgumentException argumentException) => + $"ArgumentException: {argumentException.ParamName}"; + + private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null) + { + tags = new TagList() + { + new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + PublishOperationName), + new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem), + new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic), + }; + + if (partition is not null) + { + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition)); + } + + if (errorType is not null) + { + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType)); + } + } + + private static void RecordPublish(string topic, TimeSpan duration, string? errorType = null) + { + GetTags(topic, out var tags, partition: null, errorType); + + ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags); + } + + private static void RecordPublish(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + { + GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType); + + ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags); + } + + private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message message, int? partition = null) + { + var spanName = string.Concat(topic, " ", PublishOperationName); + var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start); + if (activity == null) + { + return null; + } + + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem); + activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name); + activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topic); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, PublishOperationName); + + if (message.Key != null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, message.Key); + } + + if (partition is not null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, partition); + } + } + + return activity; + } + + private void InjectActivity(Activity? activity, Message message) + { + this.propagator.Inject(new PropagationContext(activity?.Context ?? default, Baggage.Current), message, this.InjectTraceContext); + } + + private void InjectTraceContext(Message message, string key, string value) + { + message.Headers ??= new Headers(); + message.Headers.Add(key, Encoding.UTF8.GetBytes(value)); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs new file mode 100644 index 0000000000..bcc29f5add --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +/// +/// A builder of with support for instrumentation. +/// +/// Type of the key. +/// Type of value. +public sealed class InstrumentedProducerBuilder : ProducerBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified. + public InstrumentedProducerBuilder(IEnumerable> config) + : base(config) + { + } + + internal ConfluentKafkaProducerInstrumentationOptions? Options { get; set; } + + /// + /// Build a new IProducer instance. + /// + /// an . + public override IProducer Build() + { + Debug.Assert(this.Options != null, "Options should not be null."); + + ProducerConfig config = (ProducerConfig)this.Config; + if (this.Options!.Metrics) + { + config.StatisticsIntervalMs ??= 1000; + } + + return new InstrumentedProducer(base.Build(), this.Options); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs new file mode 100644 index 0000000000..0cc6b91997 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Metrics; + +/// +/// Extension methods to simplify registering of Kafka instrumentation. +/// +public static partial class MeterProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder) + => AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(consumerBuilder); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder); + } + + /// + /// Enables the incoming requests automatic data collection for ASP.NET. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The name of the instrumentation. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder, + string? name, + InstrumentedConsumerBuilder? consumerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableMetrics); + }); + + return builder + .AddMeter(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (consumerBuilder == null) + { + consumerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + consumerBuilder.Options = options.Get(name); + } + + if (consumerBuilder.Options == null) + { + consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions(); + EnableMetrics(consumerBuilder.Options); + } + + return new ConfluentKafkaConsumerInstrumentation(consumerBuilder); + }); + } + + private static void EnableMetrics(ConfluentKafkaConsumerInstrumentationOptions options) => + options.Metrics = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs new file mode 100644 index 0000000000..3e4c4021db --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Metrics; + +/// +/// Extension methods to simplify registering of Kafka instrumentation. +/// +public static partial class MeterProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder) + => AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder, + InstrumentedProducerBuilder producerBuilder) + { + Guard.ThrowIfNull(producerBuilder); + + return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder); + } + + /// + /// Enables the incoming requests automatic data collection for ASP.NET. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The name of the instrumentation. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder, + string? name, + InstrumentedProducerBuilder? producerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableMetrics); + }); + + return builder + .AddMeter(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (producerBuilder == null) + { + producerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + producerBuilder.Options = options.Get(name); + } + + if (producerBuilder.Options == null) + { + producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions(); + EnableMetrics(producerBuilder.Options); + } + + return new ConfluentKafkaProducerInstrumentation(producerBuilder); + }); + } + + private static void EnableMetrics(ConfluentKafkaProducerInstrumentationOptions options) => + options.Metrics = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj new file mode 100644 index 0000000000..0d97d693a4 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj @@ -0,0 +1,35 @@ + + + + net8.0;net6.0;net462 + Confluent.Kafka instrumentation for OpenTelemetry .NET + $(PackageTags);distributed-tracing;Kafka;Confluent.Kafka + true + Instrumentation.ConfluentKafka- + true + + + + + true + + + + + + + + + + + + + + + + + + + + diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md new file mode 100644 index 0000000000..546e654eec --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md @@ -0,0 +1,11 @@ +# Confluent.Kafka client instrumentation for OpenTelemetry + +[![NuGet version badge](https://img.shields.io/nuget/v/OpenTelemetry.Instrumentation.ConfluentKafka)](https://www.nuget.org/packages/OpenTelemetry.Instrumentation.ConfluentKafka) +[![NuGet download count badge](https://img.shields.io/nuget/dt/OpenTelemetry.Instrumentation.ConfluentKafka)](https://www.nuget.org/packages/OpenTelemetry.Instrumentation.ConfluentKafka) +[![codecov.io](https://codecov.io/gh/open-telemetry/opentelemetry-dotnet-contrib/branch/main/graphs/badge.svg?flag=unittests-Instrumentation.ConfluentKafka)](https://app.codecov.io/gh/open-telemetry/opentelemetry-dotnet-contrib?flags[0]=unittests-Instrumentation.ConfluentKafka) + +Download the `OpenTelemetry.Instrumentation.ConfluentKafka` package: + +```shell +dotnet add package OpenTelemetry.Instrumentation.ConfluentKafka --prerelease +``` diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs new file mode 100644 index 0000000000..d9eada36e1 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace; + +/// +/// Extension methods to simplify registering of dependency instrumentation. +/// +public static partial class TracerProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder) + => AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(consumerBuilder); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Optional name which is used when retrieving options. + /// Optional to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + string? name, + InstrumentedConsumerBuilder? consumerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableTracing); + }); + + return builder + .AddSource(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (consumerBuilder == null) + { + consumerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + consumerBuilder.Options = options.Get(name); + } + + if (consumerBuilder.Options == null) + { + consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions(); + EnableTracing(consumerBuilder.Options); + } + + return new ConfluentKafkaConsumerInstrumentation(consumerBuilder); + }); + } + + private static void EnableTracing(ConfluentKafkaConsumerInstrumentationOptions options) => + options.Traces = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs new file mode 100644 index 0000000000..db977c72ae --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace; + +/// +/// Extension methods to simplify registering of dependency instrumentation. +/// +public static partial class TracerProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder) + => AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedProducerBuilder producerBuilder) + { + Guard.ThrowIfNull(producerBuilder); + + return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Optional name which is used when retrieving options. + /// Optional to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder, + string? name, + InstrumentedProducerBuilder? producerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableTracing); + }); + + return builder + .AddSource(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (producerBuilder == null) + { + producerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + producerBuilder.Options = options.Get(name); + } + + if (producerBuilder.Options == null) + { + producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions(); + EnableTracing(producerBuilder.Options); + } + + return new ConfluentKafkaProducerInstrumentation(producerBuilder); + }); + } + + private static void EnableTracing(ConfluentKafkaProducerInstrumentationOptions options) => + options.Traces = true; +} diff --git a/src/Shared/SemanticConventions.cs b/src/Shared/SemanticConventions.cs index 2fa0c8144c..9f1c1ee234 100644 --- a/src/Shared/SemanticConventions.cs +++ b/src/Shared/SemanticConventions.cs @@ -116,5 +116,25 @@ internal static class SemanticConventions public const string AttributeServerAddress = "server.address"; // replaces: "net.host.name" (AttributeNetHostName) public const string AttributeServerPort = "server.port"; // replaces: "net.host.port" (AttributeNetHostPort) public const string AttributeUserAgentOriginal = "user_agent.original"; // replaces: http.user_agent (AttributeHttpUserAgent) + + // v1.24.0 Messaging spans + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-spans.md + public const string AttributeMessagingClientId = "messaging.client_id"; + public const string AttributeMessagingDestinationName = "messaging.destination.name"; + + // v1.24.0 Messaging metrics + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-metrics.md + public const string MetricMessagingPublishDuration = "messaging.publish.duration"; + public const string MetricMessagingPublishMessages = "messaging.publish.messages"; + public const string MetricMessagingReceiveDuration = "messaging.receive.duration"; + public const string MetricMessagingReceiveMessages = "messaging.receive.messages"; + + // v1.24.0 Messaging (Kafka) + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/kafka.md + public const string AttributeMessagingKafkaConsumerGroup = "messaging.kafka.consumer.group"; + public const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; + public const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; + public const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; + #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile new file mode 100644 index 0000000000..300c474662 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile @@ -0,0 +1,19 @@ +# Create a container for running the OpenTelemetry ConfluentKafka integration tests. +# This should be run from the root of the repo: +# docker build --file test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile . + +ARG BUILD_SDK_VERSION=8.0 +ARG TEST_SDK_VERSION=8.0 + +FROM mcr.microsoft.com/dotnet/sdk:${BUILD_SDK_VERSION} AS build +ARG PUBLISH_CONFIGURATION=Release +ARG PUBLISH_FRAMEWORK=net8.0 +WORKDIR /repo +COPY . ./ +WORKDIR "/repo/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests" +RUN dotnet publish "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" -c "${PUBLISH_CONFIGURATION}" -f "${PUBLISH_FRAMEWORK}" -o /drop -p:IntegrationBuild=true -p:TARGET_FRAMEWORK=${PUBLISH_FRAMEWORK} + +FROM mcr.microsoft.com/dotnet/sdk:${TEST_SDK_VERSION} AS final +WORKDIR /test +COPY --from=build /drop . +ENTRYPOINT ["dotnet", "vstest", "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.dll"] diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs new file mode 100644 index 0000000000..f9f188f343 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedMeteringTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List metrics = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithMetrics(metricsBuilder => + { + metricsBuilder + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + IGrouping[] groups; + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() + { + Key = $"any_key_{i}", + Value = $"any_value_{i}", + }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + } + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + + await host.StopAsync(); + } + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(4, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs new file mode 100644 index 0000000000..a2ed848d1c --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedTracingAndMeteringTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List metrics = new(); + List activities = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithTracing(tracingBuilder => + { + tracingBuilder + .AddInMemoryExporter(activities) + .SetSampler(new TestSampler()) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }).WithMetrics(metricsBuilder => + { + metricsBuilder + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + IGrouping[] groups = null; + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() + { + Key = $"any_key_{i}", + Value = $"any_value_{i}", + }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + + Assert.Equal(100, j); + } + + await host.StopAsync(); + + Assert.Equal(200, activities.Count); + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + } + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(4, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs new file mode 100644 index 0000000000..d47d93d14f --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedTracingTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List activities = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithTracing(tracingBuilder => + { + tracingBuilder + .AddInMemoryExporter(activities) + .SetSampler(new TestSampler()) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() { Key = $"any_key_{i}", Value = $"any_value_{i}", }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + + Assert.Equal(100, j); + } + + await host.StopAsync(); + } + + Assert.Equal(200, activities.Count); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs new file mode 100644 index 0000000000..793753bdc2 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using OpenTelemetry.Tests; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class KafkaHelpers +{ + public const string KafkaEndPointEnvVarName = "OTEL_KAFKAENDPOINT"; + + public static readonly string KafkaEndPoint = SkipUnlessEnvVarFoundTheoryAttribute.GetEnvironmentVariable(KafkaEndPointEnvVarName); + + public static async Task ProduceTestMessageAsync() + { + string topic = $"otel-topic-{Guid.NewGuid()}"; + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaEndPoint, + }; + ProducerBuilder producerBuilder = new(producerConfig); + IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + return topic; + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs new file mode 100644 index 0000000000..c26f38743a --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class MeterProviderExtensions +{ + public static void EnsureMetricsAreFlushed(this MeterProvider meterProvider) + { + bool done; + do + { + done = meterProvider.ForceFlush(); + } + while (!done); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs new file mode 100644 index 0000000000..eff25507ff --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using Xunit; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class MeteringTests +{ + /* + To run the integration tests, set the OTEL_KAFKAENDPOINT machine-level environment variable to a valid Kafka endpoint. + + To use Docker... + 1) Run: docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local + 2) Set OTEL_KAFKAENDPOINT as: localhost:9092 + */ + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var metrics = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + IProducer producer = producerBuilder.Build(); + producer.Produce(topic, new Message + { + Value = "any_value", + }); + + await producer.FlushAsync(); + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var metrics = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + + await producer.FlushAsync(); + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutTimespanTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + + var metrics = new List(); + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using (IConsumer consumer = consumerBuilder.Build()) + { + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(); + + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj new file mode 100644 index 0000000000..defd085a4a --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj @@ -0,0 +1,28 @@ + + + Unit test project for OpenTelemetry ConfluentKafka instrumentation + + $(SupportedNetTargets) + $(TargetFrameworks);$(NetFrameworkMinimumSupportedVersion) + disable + + + + + + + + + + + + + + + + + + + + + diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs new file mode 100644 index 0000000000..01168f5fe4 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class ProducerExtensions +{ + public static async Task FlushAsync(this IProducer producer) + { + while (producer.Flush(TimeSpan.FromMilliseconds(100)) != 0) + { + await Task.Delay(100); + } + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs new file mode 100644 index 0000000000..923b4dae99 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs @@ -0,0 +1,305 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class TracingTests +{ + /* + To run the integration tests, set the OTEL_KAFKAENDPOINT machine-level environment variable to a valid Kafka endpoint. + + To use Docker... + 1) Run: docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local + 2) Set OTEL_KAFKAENDPOINT as: localhost:9092 + */ + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicPartitionTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(new TopicPartition(topic, new Partition(0)), new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + producer.Produce(topic, new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicPartitionTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + producer.Produce(new TopicPartition(topic, new Partition(0)), new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithCancellationTokenTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutMsTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(100); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutTimespanTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml new file mode 100644 index 0000000000..c749ab3632 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml @@ -0,0 +1,24 @@ +# Start a kafka container and then run OpenTelemetry ConfluentKafka integration tests. +# This should be run from the root of the repo: +# opentelemetry>docker-compose --file=test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml --project-directory=. up --exit-code-from=tests --build +version: '3.7' + +services: + kafka: + image: confluentinc/confluent-local + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://kafka:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_INTERNAL://kafka:9093,PLAINTEXT_HOST://localhost:9092 + ports: + - "9093:9093" + + tests: + build: + context: . + dockerfile: ./test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile + command: --TestCaseFilter:CategoryName=KafkaIntegrationTests + environment: + - OTEL_KAFKAENDPOINT=kafka:9093 + depends_on: + - kafka