Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Implemented providing additional connection setup less intrusively.
Browse files Browse the repository at this point in the history
Now instead of providing a connection factory, just provide a function
that will modify the connection to fit your needs.
  • Loading branch information
Enrique Ramirez committed Jun 15, 2017
1 parent 51c0a4c commit d1188f7
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 75 deletions.
16 changes: 2 additions & 14 deletions d60.Cirqus.PostgreSql/Config/PostgreSqlConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,13 @@ public static class PostgreSqlConfigurationExtensions
/// <summary>
/// Configures Cirqus to use Postgres as the event store
/// </summary>
public static void UsePostgreSql(this EventStoreConfigurationBuilder builder, string connectionStringOrConnectionStringName, string tableName, bool automaticallyCreateSchema = true)
public static void UsePostgreSql(this EventStoreConfigurationBuilder builder, string connectionStringOrConnectionStringName, string tableName, bool automaticallyCreateSchema = true, Action<NpgsqlConnection> additionalConnectionSetup = null)
{
if (builder == null) throw new ArgumentNullException("builder");
if (connectionStringOrConnectionStringName == null) throw new ArgumentNullException("connectionStringOrConnectionStringName");
if (tableName == null) throw new ArgumentNullException("tableName");

builder.Register<IEventStore>(context => new PostgreSqlEventStore(connectionStringOrConnectionStringName, tableName, automaticallyCreateSchema: automaticallyCreateSchema));
}

/// <summary>
/// Configures Cirqus to use Postgres as the event store
/// </summary>
public static void UsePostgreSql(this EventStoreConfigurationBuilder builder, Func<NpgsqlConnection> connectionFactory, string tableName, bool automaticallyCreateSchema = true)
{
if (builder == null) throw new ArgumentNullException("builder");
if (connectionFactory == null) throw new ArgumentNullException("connectionFactory");
if (tableName == null) throw new ArgumentNullException("tableName");

builder.Register<IEventStore>(context => new PostgreSqlEventStore(connectionFactory, tableName, automaticallyCreateSchema: automaticallyCreateSchema));
builder.Register<IEventStore>(context => new PostgreSqlEventStore(connectionStringOrConnectionStringName, tableName, automaticallyCreateSchema: automaticallyCreateSchema, additionalConnectionSetup: additionalConnectionSetup));
}
}
}
50 changes: 24 additions & 26 deletions d60.Cirqus.PostgreSql/Events/PostgreSqlEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,16 @@ namespace d60.Cirqus.PostgreSql.Events
{
public class PostgreSqlEventStore : IEventStore
{
readonly Func<NpgsqlConnection> _connectionFactory;
readonly Action<NpgsqlConnection> _additionalConnectionSetup;
readonly string _connectionString;
readonly string _tableName;
readonly MetadataSerializer _metadataSerializer = new MetadataSerializer();

public PostgreSqlEventStore(string connectionStringOrConnectionStringName, string tableName, bool automaticallyCreateSchema = true)
public PostgreSqlEventStore(string connectionStringOrConnectionStringName, string tableName, bool automaticallyCreateSchema = true, Action<NpgsqlConnection> additionalConnectionSetup = null)
{
_tableName = tableName;

_connectionFactory = () =>
{
var connection = new NpgsqlConnection(SqlHelper.GetConnectionString(connectionStringOrConnectionStringName));
connection.Open();
return connection;
};

if (automaticallyCreateSchema)
{
CreateSchema();
}
}

public PostgreSqlEventStore(Func<NpgsqlConnection> connectionFactory, string tableName, bool automaticallyCreateSchema = true)
{
_tableName = tableName;
_connectionFactory = connectionFactory;
_connectionString = SqlHelper.GetConnectionString(connectionStringOrConnectionStringName);
_additionalConnectionSetup = additionalConnectionSetup;

if (automaticallyCreateSchema)
{
Expand Down Expand Up @@ -82,7 +67,7 @@ PRIMARY KEY (""id"")
", _tableName);

using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
using (var command = connection.CreateCommand())
{
command.CommandText = sql;
Expand All @@ -96,7 +81,7 @@ public void Save(Guid batchId, IEnumerable<EventData> batch)

try
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
using (var tx = connection.BeginTransaction())
{
var nextSequenceNumber = GetNextGlobalSequenceNumber(connection, tx);
Expand Down Expand Up @@ -175,9 +160,22 @@ long GetNextGlobalSequenceNumber(NpgsqlConnection conn, NpgsqlTransaction tx)
}
}

NpgsqlConnection GetConnection()
{
var connection = new NpgsqlConnection(_connectionString);

if (_additionalConnectionSetup != null)
_additionalConnectionSetup.Invoke(connection);

connection.Open();

return connection;
}


public IEnumerable<EventData> Load(string aggregateRootId, long firstSeq = 0)
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var tx = connection.BeginTransaction())
{
Expand Down Expand Up @@ -205,7 +203,7 @@ public IEnumerable<EventData> Load(string aggregateRootId, long firstSeq = 0)

public IEnumerable<EventData> Stream(long globalSequenceNumber = 0)
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var tx = connection.BeginTransaction())
{
Expand Down Expand Up @@ -240,7 +238,7 @@ EventData ReadEvent(IDataRecord reader)

public long GetNextGlobalSequenceNumber()
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var tx = connection.BeginTransaction())
{
Expand All @@ -251,7 +249,7 @@ public long GetNextGlobalSequenceNumber()

public void DropEvents()
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var tx = connection.BeginTransaction())
{
Expand Down
70 changes: 35 additions & 35 deletions d60.Cirqus.PostgreSql/Views/PostgreSqlViewManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,23 @@ namespace d60.Cirqus.PostgreSql.Views
public class PostgreSqlViewManager<TViewInstance> : AbstractViewManager<TViewInstance> where TViewInstance : class, IViewInstance, ISubscribeTo, new()
{
readonly string _tableName;
private readonly Action<NpgsqlConnection> _additionalConnectionSetup;
readonly string _positionTableName;
const int PrimaryKeySize = 255;
const int DefaultPosition = -1;

readonly ViewDispatcherHelper<TViewInstance> _dispatcher = new ViewDispatcherHelper<TViewInstance>();
readonly ViewLocator _viewLocator = ViewLocator.GetLocatorFor<TViewInstance>();
readonly Logger _logger = CirqusLoggerFactory.Current.GetCurrentClassLogger();
readonly Func<NpgsqlConnection> _connectionFactory;
private readonly Func<Task<NpgsqlConnection>> _asyncConnectionFactory;
readonly string _connectionString;
readonly GenericSerializer _serializer = new GenericSerializer();

public PostgreSqlViewManager(string connectionStringOrConnectionStringName, string tableName, string positionTableName = null, bool automaticallyCreateSchema = true)
public PostgreSqlViewManager(string connectionStringOrConnectionStringName, string tableName, string positionTableName = null, bool automaticallyCreateSchema = true, Action<NpgsqlConnection> additionalConnectionSetup = null)
{
_tableName = tableName;
_additionalConnectionSetup = additionalConnectionSetup;
_positionTableName = positionTableName ?? _tableName + "_Position";

_connectionFactory = () =>
{
var connection = new NpgsqlConnection(SqlHelper.GetConnectionString(connectionStringOrConnectionStringName));
connection.Open();
return connection;
};

_asyncConnectionFactory = async () =>
{
var connection = new NpgsqlConnection(SqlHelper.GetConnectionString(connectionStringOrConnectionStringName));
await connection.OpenAsync();
return connection;
};

if (automaticallyCreateSchema)
{
CreateSchema();
}
}

public PostgreSqlViewManager(Func<NpgsqlConnection> connectionFactory, Func<Task<NpgsqlConnection>> asyncConnectionFactory, string tableName, string positionTableName = null, bool automaticallyCreateSchema = true)
{
_tableName = tableName;
_positionTableName = positionTableName ?? _tableName + "_Position";
_connectionFactory = connectionFactory;
_asyncConnectionFactory = asyncConnectionFactory;
_connectionString = SqlHelper.GetConnectionString(connectionStringOrConnectionStringName);

if (automaticallyCreateSchema)
{
Expand Down Expand Up @@ -86,14 +61,39 @@ PRIMARY KEY (""id"")

_logger.Info("Ensuring that schema for '{0}' is created...", typeof(TViewInstance));

using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
using (var command = connection.CreateCommand())
{
command.CommandText = sql;
command.ExecuteNonQuery();
}
}

NpgsqlConnection GetConnection()
{
var connection = new NpgsqlConnection(_connectionString);

if (_additionalConnectionSetup != null)
_additionalConnectionSetup.Invoke(connection);

connection.Open();

return connection;
}

async Task<NpgsqlConnection> GetConnectionAsync()
{
var connection = new NpgsqlConnection(_connectionString);

if (_additionalConnectionSetup != null)
_additionalConnectionSetup.Invoke(connection);

await connection.OpenAsync();

return connection;
}


public override string Id
{
get { return string.Format("{0}/{1}", typeof (TViewInstance).GetPrettyName(), _tableName); }
Expand All @@ -109,7 +109,7 @@ public override async Task<long> GetPosition(bool canGetFromCache = true)

async Task<long?> GetPositionFromPositionTable()
{
using (var connection = await _asyncConnectionFactory.Invoke())
using (var connection = await GetConnectionAsync())
using (var command = connection.CreateCommand())
{
command.CommandText = string.Format(@"select ""position"" from ""{0}"" where ""id"" = @id", _positionTableName);
Expand All @@ -134,7 +134,7 @@ public override void Dispatch(IViewContext viewContext, IEnumerable<DomainEvent>

var newPosition = eventList.Max(e => e.GetGlobalSequenceNumber());

using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var transaction = connection.BeginTransaction())
{
Expand Down Expand Up @@ -323,7 +323,7 @@ public override void Purge()
{
_logger.Info("Purging PostgreSQL table {0}", _tableName);

using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
using (var transaction = connection.BeginTransaction())
{
Expand All @@ -343,7 +343,7 @@ public override void Purge()

public override TViewInstance Load(string viewId)
{
using (var connection = _connectionFactory.Invoke())
using (var connection = GetConnection())
{
return FindOneById(viewId, connection, null);
}
Expand Down

0 comments on commit d1188f7

Please sign in to comment.