-
Notifications
You must be signed in to change notification settings - Fork 360
Add ConfluentKafka instrumentation #1493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
bae86e9
9d0122c
f3c4df8
a4fc992
400fdc1
cdd2470
5afe9e0
9be7c18
4256bb1
b429a6f
0f5bd95
14ad8c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
| <OpenTelemetryCoreLatestVersion>[1.9.0,2.0)</OpenTelemetryCoreLatestVersion> | ||
| <OpenTelemetryCoreLatestPrereleaseVersion>[1.9.0-rc.1]</OpenTelemetryCoreLatestPrereleaseVersion> | ||
| <StackExchangeRedisPkgVer>[2.1.58,3.0)</StackExchangeRedisPkgVer> | ||
| <ConfluentKafkaPkgVer>[2.3.0,3.0)</ConfluentKafkaPkgVer> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a blocker - this could be bump to 2.4.0 now?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vishweshbankwar Indeed 2.4.0 has been released since I started working on this. I will push the package update in one of the planned follow up PR. |
||
| <CassandraCSharpDriverPkgVer>[3.16.0,4.0)</CassandraCSharpDriverPkgVer> | ||
| <StyleCopAnalyzersPkgVer>[1.2.0-beta.507,2.0)</StyleCopAnalyzersPkgVer> | ||
| <SystemNetHttp>[4.3.4,)</SystemNetHttp> | ||
|
|
||
g7ed6e marked this conversation as resolved.
Show resolved
Hide resolved
g7ed6e marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| <Project> | ||
|
|
||
| <PropertyGroup> | ||
| <RepoRoot>$([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName)</RepoRoot> | ||
| <MinVerTagPrefix>Instrumentation.ConfluentKafka-</MinVerTagPrefix> | ||
| </PropertyGroup> | ||
|
|
||
| <ItemGroup> | ||
| <SolutionProjects Include="$(RepoRoot)\src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj" /> | ||
| <SolutionProjects Include="$(RepoRoot)\test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" /> | ||
| <SolutionProjects Include="$(RepoRoot)\examples\kafka\**\*.csproj" /> | ||
|
|
||
| <PackProjects Include="$(RepoRoot)\src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj" /> | ||
|
|
||
| <TestProjects Include="$(RepoRoot)\test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" /> | ||
| </ItemGroup> | ||
|
|
||
| <Target Name="Build"> | ||
| <MSBuild Projects="@(SolutionProjects)" Targets="Build" ContinueOnError="ErrorAndStop" /> | ||
| </Target> | ||
|
|
||
| <Target Name="Restore"> | ||
| <MSBuild Projects="@(SolutionProjects)" Targets="Restore" ContinueOnError="ErrorAndStop" /> | ||
| </Target> | ||
|
|
||
| <Target Name="Pack"> | ||
| <MSBuild Projects="@(PackProjects)" Targets="Pack" ContinueOnError="ErrorAndStop" /> | ||
| </Target> | ||
|
|
||
| <Target Name="VSTest"> | ||
| <MSBuild Projects="@(TestProjects)" Targets="VSTest" ContinueOnError="ErrorAndStop" /> | ||
| </Target> | ||
|
|
||
| </Project> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| namespace Examples.ConfluentKafka; | ||
|
|
||
| public static class Constants | ||
| { | ||
| public static readonly string Topic = $"test-topic-{Guid.NewGuid()}"; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk.Worker"> | ||
| <PropertyGroup> | ||
| <OutputType>Exe</OutputType> | ||
| <TargetFramework>net8.0</TargetFramework> | ||
| <ImplicitUsings>enable</ImplicitUsings> | ||
| <Nullable>enable</Nullable> | ||
| </PropertyGroup> | ||
| <ItemGroup> | ||
| <ProjectReference Include="$(RepoRoot)\src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj" /> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /> | ||
| <PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="$(OpenTelemetryCoreLatestVersion)" /> | ||
| <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="$(OpenTelemetryCoreLatestVersion)" /> | ||
| <PackageReference Include="OpenTelemetry.Exporter.Console" Version="$(OpenTelemetryCoreLatestVersion)" /> | ||
| </ItemGroup> | ||
| </Project> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| using Confluent.Kafka; | ||
| using OpenTelemetry.Instrumentation.ConfluentKafka; | ||
|
|
||
| namespace Examples.ConfluentKafka; | ||
|
|
||
| public class ProduceConsumeHostedService( | ||
| InstrumentedProducerBuilder<string, string> instrumentedProducerBuilder, | ||
| InstrumentedConsumerBuilder<string, string> instrumentedConsumerBuilder) | ||
| : BackgroundService | ||
| { | ||
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
| { | ||
| IProducer<string, string> producer = instrumentedProducerBuilder.Build(); | ||
| IConsumer<string, string> consumer = instrumentedConsumerBuilder.Build(); | ||
|
|
||
| for (int j = 0; j < 100; j++) | ||
| { | ||
| await producer.ProduceAsync( | ||
| Constants.Topic, | ||
| new Message<string, string> { Key = "any_key", Value = $"any_value_{j}" }, | ||
| stoppingToken); | ||
| } | ||
|
|
||
| for (int j = 0; j < 100; j++) | ||
| { | ||
| producer.Produce( | ||
| Constants.Topic, | ||
| new Message<string, string> { Key = "any_key", Value = $"any_value_{j}" }); | ||
| } | ||
|
|
||
| producer.Flush(stoppingToken); | ||
|
|
||
| consumer.Subscribe(Constants.Topic); | ||
| while (!stoppingToken.IsCancellationRequested) | ||
| { | ||
| ConsumeResult<string, string> consumeResult = consumer.Consume(stoppingToken); | ||
| if (consumeResult == null) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| if (consumeResult.IsPartitionEOF) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| Console.WriteLine($"Consumer {consumer.Name} received message: {consumeResult.Message.Value}"); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| using Confluent.Kafka; | ||
| using Examples.ConfluentKafka; | ||
| using OpenTelemetry.Instrumentation.ConfluentKafka; | ||
| using OpenTelemetry.Metrics; | ||
| using OpenTelemetry.Trace; | ||
|
|
||
| var builder = Host.CreateApplicationBuilder(args); | ||
|
|
||
| const string bootstrapServers = "localhost:9092"; | ||
|
|
||
| builder.Services.AddSingleton(_ => | ||
| { | ||
| ProducerConfig producerConfig = new() { BootstrapServers = bootstrapServers }; | ||
| return new InstrumentedProducerBuilder<string, string>(producerConfig); | ||
| }); | ||
| builder.Services.AddSingleton(_ => | ||
| { | ||
| ConsumerConfig consumerConfigA = new() | ||
| { | ||
| BootstrapServers = bootstrapServers, | ||
| GroupId = "group-a", | ||
| AutoOffsetReset = AutoOffsetReset.Earliest, | ||
| EnablePartitionEof = true, | ||
| }; | ||
| return new InstrumentedConsumerBuilder<string, string>(consumerConfigA); | ||
| }); | ||
|
|
||
| builder.Services.AddOpenTelemetry() | ||
| .WithTracing(tracing => | ||
| { | ||
| tracing.AddConsoleExporter() | ||
| .AddOtlpExporter() | ||
| .AddKafkaProducerInstrumentation<string, string>() | ||
| .AddKafkaConsumerInstrumentation<string, string>(); | ||
| }) | ||
| .WithMetrics(metering => | ||
| { | ||
| metering.AddConsoleExporter() | ||
| .AddOtlpExporter() | ||
| .AddKafkaProducerInstrumentation<string, string>() | ||
| .AddKafkaConsumerInstrumentation<string, string>(); | ||
| }); | ||
|
|
||
| builder.Services.AddHostedService<ProduceConsumeHostedService>(); | ||
|
|
||
| var app = builder.Build(); | ||
| await app.RunAsync(); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| # Run Examples.ConfluentKafka | ||
|
|
||
| Start the Confluent Kafka stack: | ||
|
|
||
| ```cmd | ||
| docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local | ||
| ``` | ||
|
|
||
| Start the Aspire Dashboard: | ||
|
|
||
| ```cmd | ||
| docker run --rm -it -p 18888:18888 -p 4317:18889 -d --name aspire-dashboard mcr.microsoft.com/dotnet/nightly/aspire-dashboard:8.0.0 | ||
| ``` |
Uh oh!
There was an error while loading. Please reload this page.