Skip to content

Commit 6086572

Browse files
committed
expose NpgsqlDataSource instead of connection string - close #16
1 parent eadba73 commit 6086572

13 files changed

+44
-47
lines changed

src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs

Lines changed: 0 additions & 2 deletions
This file was deleted.

src/Blumchen.DependencyInjection/Workers/Worker.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
using System.Collections.Concurrent;
22
using System.Text.Json.Serialization;
3-
using Blumchen.Configuration;
43
using Blumchen.Serialization;
54
using Blumchen.Subscriptions;
65
using Blumchen.Subscriptions.Management;
76
using Microsoft.Extensions.Hosting;
87
using Microsoft.Extensions.Logging;
8+
using Npgsql;
99
using Polly;
1010

1111

1212
namespace Blumchen.Workers;
1313

1414
public abstract class Worker<T>(
15-
DatabaseOptions databaseOptions,
15+
NpgsqlDataSource dataSource,
1616
IHandler<T> handler,
1717
JsonSerializerContext jsonSerializerContext,
1818
IErrorProcessor errorProcessor,
@@ -21,9 +21,8 @@ public abstract class Worker<T>(
2121
PublicationManagement.PublicationSetupOptions publicationSetupOptions,
2222
ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions,
2323
Func<TableDescriptorBuilder,TableDescriptorBuilder> tableDescriptorBuilder,
24-
ILoggerFactory loggerFactory): BackgroundService where T : class
24+
ILogger logger): BackgroundService where T : class
2525
{
26-
private readonly ILogger<Worker<T>> _logger = loggerFactory.CreateLogger<Worker<T>>();
2726
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
2827
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
2928
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
@@ -33,9 +32,9 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)
3332
{
3433
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
3534
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
36-
(_, _) => (_, __, ___) => { }
35+
(_, _) => (_, _, _) => { }
3736
};
38-
LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
37+
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
3938
}
4039

4140
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -45,21 +44,21 @@ await pipeline.ExecuteAsync(async token =>
4544
await using var subscription = new Subscription();
4645
await using var cursor = subscription.Subscribe(builder =>
4746
builder
48-
.ConnectionString(databaseOptions.ConnectionString)
47+
.DataSource(dataSource)
4948
.WithTable(tableDescriptorBuilder)
5049
.WithErrorProcessor(errorProcessor)
5150
.Handles<T, IHandler<T>>(handler)
5251
.NamingPolicy(namingPolicy)
5352
.JsonContext(jsonSerializerContext)
5453
.WithPublicationOptions(publicationSetupOptions)
5554
.WithReplicationOptions(replicationSlotSetupOptions)
56-
, ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token);
57-
Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName);
55+
, ct: token).GetAsyncEnumerator(token);
56+
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
5857
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
59-
Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
58+
Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
6059

6160
}, stoppingToken).ConfigureAwait(false);
62-
Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
61+
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
6362
return;
6463
}
6564

src/Blumchen/Serialization/ITypeResolver.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ internal sealed class JsonTypeResolver(
2424
internal void WhiteList(Type type)
2525
{
2626
var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName);
27-
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type);
28-
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo);
27+
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type);
28+
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo);
2929
}
3030

3131
public (string, JsonTypeInfo) Resolve(Type type) =>

src/Blumchen/Subscriptions/ISubscriptionOptions.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
using Blumchen.Subscriptions.Replication;
22
using JetBrains.Annotations;
3+
using Npgsql;
34
using static Blumchen.Subscriptions.Management.PublicationManagement;
45
using static Blumchen.Subscriptions.Management.ReplicationSlotManagement;
56

67
namespace Blumchen.Subscriptions;
78

89
internal interface ISubscriptionOptions
910
{
10-
[UsedImplicitly] string ConnectionString { get; }
11+
[UsedImplicitly] NpgsqlDataSource DataSource { get; }
1112
IReplicationDataMapper DataMapper { get; }
1213
[UsedImplicitly] PublicationSetupOptions PublicationOptions { get; }
1314
[UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; }
1415
[UsedImplicitly] IErrorProcessor ErrorProcessor { get; }
1516

1617
void Deconstruct(
17-
out string connectionString,
18+
out NpgsqlDataSource dataSource,
1819
out PublicationSetupOptions publicationSetupOptions,
1920
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
2021
out IErrorProcessor errorProcessor,
@@ -23,7 +24,7 @@ void Deconstruct(
2324
}
2425

2526
internal record SubscriptionOptions(
26-
string ConnectionString,
27+
NpgsqlDataSource DataSource,
2728
PublicationSetupOptions PublicationOptions,
2829
ReplicationSlotSetupOptions ReplicationOptions,
2930
IErrorProcessor ErrorProcessor,

src/Blumchen/Subscriptions/Subscription.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,17 @@ public enum CreateStyle
3030
private ISubscriptionOptions? _options;
3131
public async IAsyncEnumerable<IEnvelope> Subscribe(
3232
Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder> builder,
33-
ILoggerFactory? loggerFactory = null,
3433
[EnumeratorCancellation] CancellationToken ct = default
3534
)
3635
{
3736
_options = builder(_builder).Build();
38-
var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;
39-
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
40-
dataSourceBuilder.UseLoggerFactory(loggerFactory);
41-
42-
var dataSource = dataSourceBuilder.Build();
37+
var (dataSource, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;
38+
4339
await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);
4440

45-
_connection = new LogicalReplicationConnection(connectionString);
41+
_connection = new LogicalReplicationConnection(dataSource.ConnectionString);
4642
await _connection.Open(ct).ConfigureAwait(false);
4743

48-
4944
await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false);
5045
var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false);
5146

src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
using Blumchen.Subscriptions.Management;
33
using Blumchen.Subscriptions.Replication;
44
using JetBrains.Annotations;
5+
using Npgsql;
56
using System.Text.Json.Serialization;
67

78
namespace Blumchen.Subscriptions;
89

910
public sealed class SubscriptionOptionsBuilder
1011
{
11-
private static string? _connectionString;
12+
private static NpgsqlDataSource? _dataSource;
1213
private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions;
1314
private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
1415
private static IReplicationDataMapper? _dataMapper;
@@ -22,7 +23,6 @@ public sealed class SubscriptionOptionsBuilder
2223

2324
static SubscriptionOptionsBuilder()
2425
{
25-
_connectionString = null;
2626
_publicationSetupOptions = new();
2727
_replicationSlotSetupOptions = default;
2828
_dataMapper = default;
@@ -38,9 +38,9 @@ public SubscriptionOptionsBuilder WithTable(
3838
}
3939

4040
[UsedImplicitly]
41-
public SubscriptionOptionsBuilder ConnectionString(string connectionString)
41+
public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource)
4242
{
43-
_connectionString = connectionString;
43+
_dataSource = dataSource;
4444
return this;
4545
}
4646

@@ -91,7 +91,7 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce
9191
internal ISubscriptionOptions Build()
9292
{
9393
_messageTable ??= TableDescriptorBuilder.Build();
94-
ArgumentNullException.ThrowIfNull(_connectionString);
94+
ArgumentNullException.ThrowIfNull(_dataSource);
9595
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
9696

9797
var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
@@ -104,7 +104,7 @@ internal ISubscriptionOptions Build()
104104
if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer());
105105

106106
return new SubscriptionOptions(
107-
_connectionString,
107+
_dataSource,
108108
_publicationSetupOptions,
109109
_replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(),
110110
_errorProcessor ?? new ConsoleOutErrorProcessor(),

src/Subscriber/Program.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Blumchen.Subscriptions;
33
using Commons;
44
using Microsoft.Extensions.Logging;
5+
using Npgsql;
56
using Subscriber;
67

78
#pragma warning disable CS8601 // Possible null reference assignment.
@@ -20,9 +21,11 @@
2021

2122
try
2223
{
24+
var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString)
25+
.UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole()));
2326
var cursor = subscription.Subscribe(
2427
builder => builder
25-
.ConnectionString(Settings.ConnectionString)
28+
.DataSource(dataSourceBuilder.Build())
2629
.WithTable(options => options
2730
.Id("id")
2831
.MessageType("message_type")
@@ -31,7 +34,7 @@
3134
.NamingPolicy(new AttributeNamingPolicy())
3235
.JsonContext(SourceGenerationContext.Default)
3336
.Handles<UserCreatedContract, Consumer>(consumer)
34-
.Handles<UserDeletedContract, Consumer>(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct
37+
.Handles<UserDeletedContract, Consumer>(consumer), ct:ct
3538
).GetAsyncEnumerator(ct);
3639
await using var cursor1 = cursor.ConfigureAwait(false);
3740
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested);

src/SubscriberWorker/Program.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Text.Json.Serialization;
2-
using Blumchen.Configuration;
32
using Blumchen.Serialization;
43
using Blumchen.Subscriptions;
54
using Blumchen.Workers;
@@ -10,6 +9,7 @@
109
using Polly.Retry;
1110
using Polly;
1211
using SubscriberWorker;
12+
using Npgsql;
1313

1414

1515
#pragma warning disable CS8601 // Possible null reference assignment.
@@ -29,12 +29,13 @@
2929
.AddSingleton<IHandler<UserCreatedContract>, Handler<UserCreatedContract>>()
3030
.AddBlumchen<SubscriberWorker<UserDeletedContract>, UserDeletedContract>()
3131
.AddSingleton<IHandler<UserDeletedContract>, Handler<UserDeletedContract>>()
32-
32+
.AddTransient(sp =>
33+
new NpgsqlDataSourceBuilder(Settings.ConnectionString)
34+
.UseLoggerFactory(sp.GetRequiredService<ILoggerFactory>()).Build())
3335
.AddSingleton<INamingPolicy, AttributeNamingPolicy>()
3436
.AddSingleton<IErrorProcessor, ConsoleOutErrorProcessor>()
3537
.AddSingleton<JsonSerializerContext, SourceGenerationContext>()
36-
.AddSingleton(new DatabaseOptions(Settings.ConnectionString))
37-
.AddResiliencePipeline("default",(pipelineBuilder,context) =>
38+
.AddResiliencePipeline("default",(pipelineBuilder,_) =>
3839
pipelineBuilder
3940
.AddRetry(new RetryStrategyOptions
4041
{
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
using System.Text.Json.Serialization;
2-
using Blumchen.Configuration;
32
using Blumchen.Serialization;
43
using Blumchen.Subscriptions;
54
using Blumchen.Subscriptions.Management;
65
using Blumchen.Workers;
76
using Microsoft.Extensions.Logging;
7+
using Npgsql;
88
using Polly.Registry;
99
// ReSharper disable ClassNeverInstantiated.Global
1010

1111
namespace SubscriberWorker;
1212
public class SubscriberWorker<T>(
13-
DatabaseOptions databaseOptions,
13+
NpgsqlDataSource dataSource,
1414
IHandler<T> handler,
1515
JsonSerializerContext jsonSerializerContext,
1616
ResiliencePipelineProvider<string> pipelineProvider,
1717
INamingPolicy namingPolicy,
1818
IErrorProcessor errorProcessor,
19-
ILoggerFactory loggerFactory
20-
): Worker<T>(databaseOptions
19+
ILogger logger
20+
): Worker<T>(dataSource
2121
, handler
2222
, jsonSerializerContext
2323
, errorProcessor
@@ -26,4 +26,4 @@ ILoggerFactory loggerFactory
2626
, new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub")
2727
, new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot")
2828
, tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults()
29-
, loggerFactory) where T : class;
29+
, logger) where T : class;

src/Tests/DatabaseFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri
8181
var consumer = new TestHandler<T>(log, jsonTypeInfo);
8282
var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder()
8383
.WithErrorProcessor(new TestOutErrorProcessor(Output))
84-
.ConnectionString(connectionString)
84+
.DataSource(new NpgsqlDataSourceBuilder(connectionString).Build())
8585
.JsonContext(info)
8686
.NamingPolicy(namingPolicy)
8787
.Handles<T, TestHandler<T>>(consumer)

src/Tests/When_Subscription_Already_Exists.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ await MessageAppender.AppendAsync(
4646

4747
var subscription = new Subscription();
4848
await using var subscription1 = subscription.ConfigureAwait(false);
49-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
49+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
5050
{
5151
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
5252
return;

src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
3838
SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine);
3939
var subscription = new Subscription();
4040
await using var subscription1 = subscription.ConfigureAwait(false);
41-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
41+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
4242
{
4343
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
4444
return;

src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
4040
var subscription = new Subscription();
4141
await using var subscription1 = subscription.ConfigureAwait(false);
4242

43-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
43+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
4444
{
4545
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
4646
return;

0 commit comments

Comments
 (0)