diff --git a/samples/KafkaBasic/Consumer/Consumer.csproj b/samples/KafkaBasic/Consumer/Consumer.csproj
new file mode 100644
index 00000000..c09f5723
--- /dev/null
+++ b/samples/KafkaBasic/Consumer/Consumer.csproj
@@ -0,0 +1,16 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/Consumer/ConsumerWorker.cs b/samples/KafkaBasic/Consumer/ConsumerWorker.cs
new file mode 100644
index 00000000..f2f587db
--- /dev/null
+++ b/samples/KafkaBasic/Consumer/ConsumerWorker.cs
@@ -0,0 +1,37 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using Confluent.Kafka;
+
+namespace Consumer;
+
+internal sealed class ConsumerWorker(IConsumer consumer, ILogger logger) : BackgroundService
+{
+ protected override Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ long i = 0;
+ return Task.Factory.StartNew(async () =>
+ {
+ consumer.Subscribe("topic");
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ ConsumeResult? result = default;
+ try
+ {
+ result = consumer.Consume(TimeSpan.FromSeconds(1));
+ }
+ catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart)
+ {
+ await Task.Delay(100);
+ continue;
+ }
+
+ i++;
+ if (i % 1000 == 0)
+ {
+ logger.LogInformation($"Received {i} messages. current offset is '{result!.Offset}'");
+ }
+ }
+ }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
+ }
+}
diff --git a/samples/KafkaBasic/Consumer/Program.cs b/samples/KafkaBasic/Consumer/Program.cs
new file mode 100644
index 00000000..f82c9181
--- /dev/null
+++ b/samples/KafkaBasic/Consumer/Program.cs
@@ -0,0 +1,15 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using Confluent.Kafka;
+using Consumer;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+builder.AddServiceDefaults();
+
+builder.AddKafkaConsumer("kafka");
+
+builder.Services.AddHostedService();
+
+builder.Build().Run();
diff --git a/samples/KafkaBasic/Consumer/appsettings.Development.json b/samples/KafkaBasic/Consumer/appsettings.Development.json
new file mode 100644
index 00000000..b2dcdb67
--- /dev/null
+++ b/samples/KafkaBasic/Consumer/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/samples/KafkaBasic/Consumer/appsettings.json b/samples/KafkaBasic/Consumer/appsettings.json
new file mode 100644
index 00000000..42aa3d91
--- /dev/null
+++ b/samples/KafkaBasic/Consumer/appsettings.json
@@ -0,0 +1,21 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information",
+ "Azure": "Warning"
+ }
+ },
+ "Aspire": {
+ "Confluent": {
+ "Kafka": {
+ "Consumer": {
+ "Config": {
+ "AutoOffsetReset": "Earliest",
+ "GroupId": "aspire"
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props
new file mode 100644
index 00000000..b9b39c05
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets
new file mode 100644
index 00000000..281a6cb2
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj b/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj
new file mode 100644
index 00000000..d96354be
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj
@@ -0,0 +1,17 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ true
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs b/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs
new file mode 100644
index 00000000..5b33d5d4
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs
@@ -0,0 +1,14 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+var builder = DistributedApplication.CreateBuilder(args);
+
+var containerResource = builder.AddKafkaContainer("kafka");
+
+builder.AddProject("producer")
+ .WithReference(containerResource);
+
+builder.AddProject("consumer")
+ .WithReference(containerResource);
+
+builder.Build().Run();
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json b/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json
new file mode 100644
index 00000000..e250d24c
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json
@@ -0,0 +1,16 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "http": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "launchBrowser": true,
+ "applicationUrl": "http://localhost:15024",
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "DOTNET_ENVIRONMENT": "Development",
+ "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16132"
+ }
+ }
+ }
+}
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json
new file mode 100644
index 00000000..0c208ae9
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ }
+}
diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json
new file mode 100644
index 00000000..31c092aa
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json
@@ -0,0 +1,9 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning",
+ "Aspire.Hosting.Dcp": "Warning"
+ }
+ }
+}
diff --git a/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs
new file mode 100644
index 00000000..c59308d5
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs
@@ -0,0 +1,119 @@
+using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore.Diagnostics.HealthChecks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Microsoft.Extensions.Logging;
+using OpenTelemetry.Logs;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Trace;
+
+namespace Microsoft.Extensions.Hosting;
+
+public static class Extensions
+{
+ public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder)
+ {
+ builder.ConfigureOpenTelemetry();
+
+ builder.AddDefaultHealthChecks();
+
+ builder.Services.AddServiceDiscovery();
+
+ builder.Services.ConfigureHttpClientDefaults(http =>
+ {
+ // Turn on resilience by default
+ http.AddStandardResilienceHandler();
+
+ // Turn on service discovery by default
+ http.UseServiceDiscovery();
+ });
+
+ return builder;
+ }
+
+ public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder)
+ {
+ builder.Logging.AddOpenTelemetry(logging =>
+ {
+ logging.IncludeFormattedMessage = true;
+ logging.IncludeScopes = true;
+ });
+
+ builder.Services.AddOpenTelemetry()
+ .WithMetrics(metrics =>
+ {
+ metrics.AddRuntimeInstrumentation()
+ .AddBuiltInMeters();
+ })
+ .WithTracing(tracing =>
+ {
+ if (builder.Environment.IsDevelopment())
+ {
+ // We want to view all traces in development
+ tracing.SetSampler(new AlwaysOnSampler());
+ }
+
+ tracing.AddAspNetCoreInstrumentation()
+ .AddGrpcClientInstrumentation()
+ .AddHttpClientInstrumentation();
+ });
+
+ builder.AddOpenTelemetryExporters();
+
+ return builder;
+ }
+
+ private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder)
+ {
+ var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]);
+
+ if (useOtlpExporter)
+ {
+ builder.Services.Configure(logging => logging.AddOtlpExporter());
+ builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter());
+ builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter());
+ }
+
+ // Uncomment the following lines to enable the Prometheus exporter (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
+ // builder.Services.AddOpenTelemetry()
+ // .WithMetrics(metrics => metrics.AddPrometheusExporter());
+
+ // Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.Exporter package)
+ // builder.Services.AddOpenTelemetry()
+ // .UseAzureMonitor();
+
+ return builder;
+ }
+
+ public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder)
+ {
+ builder.Services.AddHealthChecks()
+ // Add a default liveness check to ensure app is responsive
+ .AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]);
+
+ return builder;
+ }
+
+ public static WebApplication MapDefaultEndpoints(this WebApplication app)
+ {
+ // Uncomment the following line to enable the Prometheus endpoint (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
+ // app.MapPrometheusScrapingEndpoint();
+
+ // All health checks must pass for app to be considered ready to accept traffic after starting
+ app.MapHealthChecks("/health");
+
+ // Only health checks tagged with the "live" tag must pass for app to be considered alive
+ app.MapHealthChecks("/alive", new HealthCheckOptions
+ {
+ Predicate = r => r.Tags.Contains("live")
+ });
+
+ return app;
+ }
+
+ private static MeterProviderBuilder AddBuiltInMeters(this MeterProviderBuilder meterProviderBuilder) =>
+ meterProviderBuilder.AddMeter(
+ "Microsoft.AspNetCore.Hosting",
+ "Microsoft.AspNetCore.Server.Kestrel",
+ "System.Net.Http");
+}
diff --git a/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj
new file mode 100644
index 00000000..5a525eeb
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj
@@ -0,0 +1,27 @@
+
+
+
+ Library
+ net8.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/KafkaBasic.sln b/samples/KafkaBasic/KafkaBasic.sln
new file mode 100644
index 00000000..6d1a44ad
--- /dev/null
+++ b/samples/KafkaBasic/KafkaBasic.sln
@@ -0,0 +1,42 @@
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.9.34310.174
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.AppHost", "KafkaBasic.AppHost\KafkaBasic.AppHost.csproj", "{C0E6A5CB-D61D-4091-9F5E-81562E480C40}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.ServiceDefaults", "KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj", "{DE933720-1947-4920-A2E8-CB943D381634}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "Producer\Producer.csproj", "{45316E78-FF0A-4984-B303-F292BB3340C7}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "Consumer\Consumer.csproj", "{6612601B-5912-4858-B23F-A2CC061C2918}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.Build.0 = Release|Any CPU
+ {45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {3683F1C2-032E-43A3-93C0-3F79606377E4}
+ EndGlobalSection
+EndGlobal
diff --git a/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs b/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs
new file mode 100644
index 00000000..90633e4c
--- /dev/null
+++ b/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs
@@ -0,0 +1,22 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using Confluent.Kafka;
+
+namespace Producer;
+
+internal sealed class ContinuousProducerWorker(IProducer producer, ILogger logger) : BackgroundService
+{
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(10));
+ long i = 0;
+ while (await timer.WaitForNextTickAsync(stoppingToken))
+ {
+ var message = new Message { Value = $"Hello, World! {i}" };
+ producer.Produce("topic", message);
+ logger.LogInformation($"{producer.Name} sent message '{message.Value}'");
+ i++;
+ }
+ }
+}
diff --git a/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs b/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs
new file mode 100644
index 00000000..e3113afe
--- /dev/null
+++ b/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs
@@ -0,0 +1,28 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using Confluent.Kafka;
+
+namespace Producer;
+
+internal sealed class IntermittentProducerWorker(IProducer producer, ILogger logger) : BackgroundService
+{
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ long i = 0;
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ for (int j = 0; j < 1000; j++, i++)
+ {
+ var message = new Message { Value = $"Hello, World! {i}" };
+ producer.Produce("topic", message);
+ }
+
+ producer.Flush(stoppingToken);
+
+ logger.LogInformation($"{producer.Name} sent 1000 messages, waiting 10 s");
+
+ await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
+ }
+ }
+}
diff --git a/samples/KafkaBasic/Producer/Producer.csproj b/samples/KafkaBasic/Producer/Producer.csproj
new file mode 100644
index 00000000..c09f5723
--- /dev/null
+++ b/samples/KafkaBasic/Producer/Producer.csproj
@@ -0,0 +1,16 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaBasic/Producer/Program.cs b/samples/KafkaBasic/Producer/Program.cs
new file mode 100644
index 00000000..41ffc80b
--- /dev/null
+++ b/samples/KafkaBasic/Producer/Program.cs
@@ -0,0 +1,17 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using Confluent.Kafka;
+using Producer;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+builder.AddServiceDefaults();
+
+builder.AddKafkaProducer("kafka");
+builder.AddKafkaProducer("kafka");
+
+builder.Services.AddHostedService();
+builder.Services.AddHostedService();
+
+builder.Build().Run();
diff --git a/samples/KafkaBasic/Producer/appsettings.Development.json b/samples/KafkaBasic/Producer/appsettings.Development.json
new file mode 100644
index 00000000..b2dcdb67
--- /dev/null
+++ b/samples/KafkaBasic/Producer/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/samples/KafkaBasic/Producer/appsettings.json b/samples/KafkaBasic/Producer/appsettings.json
new file mode 100644
index 00000000..22dbdff2
--- /dev/null
+++ b/samples/KafkaBasic/Producer/appsettings.json
@@ -0,0 +1,20 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information",
+ "Azure": "Warning"
+ }
+ },
+ "Aspire": {
+ "Confluent": {
+ "Kafka": {
+ "Producer": {
+ "Config": {
+ "Acks": "All"
+ }
+ }
+ }
+ }
+ }
+}