Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
began doing postgres event store
Browse files Browse the repository at this point in the history
  • Loading branch information
mookid8000 committed Aug 27, 2014
1 parent 7ba2344 commit 252e6de
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 2 deletions.
10 changes: 10 additions & 0 deletions Cirqus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "nlog", "nlog", "{61C025A5-1
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "d60.Cirqus.NLog", "d60.Cirqus.NLog\d60.Cirqus.NLog.csproj", "{F122F403-DFCF-4ACB-9833-5D93EF357A06}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgresql", "postgresql", "{B3C09C49-9BB5-401B-BD4E-E164D0E4B2A6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "d60.Cirqus.PostgreSql", "d60.Cirqus.PostgreSql\d60.Cirqus.PostgreSql.csproj", "{1DCEE03D-8316-443B-92D5-DD89D368837C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -107,6 +111,10 @@ Global
{F122F403-DFCF-4ACB-9833-5D93EF357A06}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F122F403-DFCF-4ACB-9833-5D93EF357A06}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F122F403-DFCF-4ACB-9833-5D93EF357A06}.Release|Any CPU.Build.0 = Release|Any CPU
{1DCEE03D-8316-443B-92D5-DD89D368837C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1DCEE03D-8316-443B-92D5-DD89D368837C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1DCEE03D-8316-443B-92D5-DD89D368837C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1DCEE03D-8316-443B-92D5-DD89D368837C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -127,5 +135,7 @@ Global
{7624F47B-B53F-40C6-B211-19288E814F86} = {B7EFFA50-A51B-4D7D-A264-07151A10F9D8}
{61C025A5-1616-423F-B877-A3E017472C76} = {AAD28E32-1CB5-46C8-B3B2-40BDC275EA63}
{F122F403-DFCF-4ACB-9833-5D93EF357A06} = {61C025A5-1616-423F-B877-A3E017472C76}
{B3C09C49-9BB5-401B-BD4E-E164D0E4B2A6} = {AAD28E32-1CB5-46C8-B3B2-40BDC275EA63}
{1DCEE03D-8316-443B-92D5-DD89D368837C} = {B3C09C49-9BB5-401B-BD4E-E164D0E4B2A6}
EndGlobalSection
EndGlobal
2 changes: 2 additions & 0 deletions d60.Cirqus.MsSql/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
Expand Down Expand Up @@ -33,3 +34,4 @@
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
[assembly: InternalsVisibleTo("d60.Cirqus.Tests")]
2 changes: 1 addition & 1 deletion d60.Cirqus.MsSql/SqlHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace d60.Cirqus.MsSql
{
public class SqlHelper
class SqlHelper
{
/// <summary>
/// Looks for a connection string in AppSettings with the specified name and returns that if possible - otherwise,
Expand Down
157 changes: 157 additions & 0 deletions d60.Cirqus.PostgreSql/PostgreSqlEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using d60.Cirqus.Events;
using d60.Cirqus.Extensions;
using d60.Cirqus.Serialization;
using Npgsql;
using NpgsqlTypes;

namespace d60.Cirqus.PostgreSql
{
public class PostgreSqlEventStore : IEventStore
{
readonly string _tableName;
readonly string _connectionString;
readonly Serializer _serializer = new Serializer("<events>");

public PostgreSqlEventStore(string connectionStringOrConnectionStringName, string tableName, bool automaticallyCreateSchema = true)
{
_tableName = tableName;
_connectionString = SqlHelper.GetConnectionString(connectionStringOrConnectionStringName);

if (automaticallyCreateSchema)
{
CreateSchema();
}
}

void CreateSchema()
{
var sql = string.Format(@"
CREATE TABLE IF NOT EXISTS ""{0}"" (
""id"" BIGSERIAL NOT NULL,
""batchId"" UUID NOT NULL,
""aggId"" UUID NOT NULL,
""seqNo"" BIGINT NOT NULL,
""globSeqNo"" BIGINT NOT NULL,
""data"" JSONB NOT NULL,
PRIMARY KEY (""id"")
);
", _tableName);

/*
* [id] [bigint] IDENTITY(1,1) NOT NULL,
[batchId] [uniqueidentifier] NOT NULL,
[aggId] [uniqueidentifier] NOT NULL,
[seqNo] [bigint] NOT NULL,
[globSeqNo] [bigint] NOT NULL,
[data] [nvarchar](max) NOT NULL,
*/

using (var connection = GetConnection())
using (var command = connection.CreateCommand())
{
command.CommandText = sql;
command.ExecuteNonQuery();
}
}

public void Save(Guid batchId, IEnumerable<DomainEvent> batch)
{
using (var connection = GetConnection())
using (var tx = connection.BeginTransaction())
{
var eventList = batch.ToList();

var nextSequenceNumber = GetNextSequenceNumber(connection, tx);

foreach (var e in eventList)
{
e.Meta[DomainEvent.MetadataKeys.GlobalSequenceNumber] = nextSequenceNumber++;
e.Meta[DomainEvent.MetadataKeys.BatchId] = batchId;
}

EventValidation.ValidateBatchIntegrity(batchId, eventList);

foreach (var e in eventList)
{
using (var cmd = connection.CreateCommand())
{
cmd.Transaction = tx;
cmd.CommandText = string.Format(@"
INSERT INTO ""{0}"" (
""batchId"",
""aggId"",
""seqNo"",
""globSeqNo"",
""data""
) VALUES (
@batchId,
@aggId,
@seqNo,
@globSeqNo,
@data
)
", _tableName);


cmd.Parameters.AddWithValue("batchId", batchId);
cmd.Parameters.AddWithValue("aggId", e.GetAggregateRootId());
cmd.Parameters.AddWithValue("seqNo", e.Meta[DomainEvent.MetadataKeys.SequenceNumber]);
cmd.Parameters.AddWithValue("globSeqNo", e.Meta[DomainEvent.MetadataKeys.GlobalSequenceNumber]);
cmd.Parameters.AddWithValue("data", _serializer.Serialize(e));

cmd.ExecuteNonQuery();
}
}

tx.Commit();
}
}

long GetNextSequenceNumber(NpgsqlConnection conn, NpgsqlTransaction tx)
{
using (var cmd = conn.CreateCommand())
{
cmd.Transaction = tx;
cmd.CommandText = string.Format(@"SELECT MAX(""globSeqNo"") FROM ""{0}""", _tableName);

var result = cmd.ExecuteScalar();

return result != DBNull.Value
? (long)result + 1
: 0;
}
}


NpgsqlConnection GetConnection()
{
var connection = new NpgsqlConnection(_connectionString);

connection.Open();

return connection;
}

public IEnumerable<DomainEvent> Load(Guid aggregateRootId, long firstSeq = 0, long limit = Int32.MaxValue)
{
throw new NotImplementedException();
}

public long GetNextSeqNo(Guid aggregateRootId)
{
return 0;
}

public IEnumerable<DomainEvent> Stream(long globalSequenceNumber = 0)
{
throw new NotImplementedException();
}
}
}
36 changes: 36 additions & 0 deletions d60.Cirqus.PostgreSql/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("d60.Cirqus.PostgreSql")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("d60.Cirqus.PostgreSql")]
[assembly: AssemblyCopyright("Copyright © 2014")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("3b2b7999-545d-4d96-b5ec-7f20b773fe92")]

// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
22 changes: 22 additions & 0 deletions d60.Cirqus.PostgreSql/SqlHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Configuration;

namespace d60.Cirqus.PostgreSql
{
class SqlHelper
{
/// <summary>
/// Looks for a connection string in AppSettings with the specified name and returns that if possible - otherwise,
/// it is assumed that the string is a connection string in itself
/// </summary>
public static string GetConnectionString(string connectionStringOrConnectionStringName)
{
var connectionStringSettings = ConfigurationManager.ConnectionStrings[connectionStringOrConnectionStringName];

var connectionString = connectionStringSettings != null
? connectionStringSettings.ConnectionString
: connectionStringOrConnectionStringName;

return connectionString;
}
}
}
79 changes: 79 additions & 0 deletions d60.Cirqus.PostgreSql/d60.Cirqus.PostgreSql.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{1DCEE03D-8316-443B-92D5-DD89D368837C}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>d60.Cirqus.PostgreSql</RootNamespace>
<AssemblyName>d60.Cirqus.PostgreSql</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
<RestorePackages>true</RestorePackages>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Mono.Security">
<HintPath>..\packages\Npgsql.2.2.0-rc2\lib\net45\Mono.Security.dll</HintPath>
</Reference>
<Reference Include="Npgsql">
<HintPath>..\packages\Npgsql.2.2.0-rc2\lib\net45\Npgsql.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="PostgreSqlEventStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SqlHelper.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\d60.Cirqus\d60.Cirqus.csproj">
<Project>{8E2DF6B4-A4DF-495E-8D74-5DCE6B333FB4}</Project>
<Name>d60.Cirqus</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>
4 changes: 4 additions & 0 deletions d60.Cirqus.PostgreSql/packages.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Npgsql" version="2.2.0-rc2" targetFramework="net45" />
</packages>
2 changes: 1 addition & 1 deletion d60.Cirqus.Tests/Contracts/EventStore/EventStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
using d60.Cirqus.Tests.Contracts.EventStore.Factories;
using d60.Cirqus.Tests.Stubs;
using NUnit.Framework;
using TestContext = d60.Cirqus.TestHelpers.TestContext;

namespace d60.Cirqus.Tests.Contracts.EventStore
{
[Description("Contract test for event stores. Verifies that event store implementation and sequence number generation works in tandem")]
[TestFixture(typeof(MongoDbEventStoreFactory), Category = TestCategories.MongoDb)]
[TestFixture(typeof(InMemoryEventStoreFactory))]
[TestFixture(typeof(MsSqlEventStoreFactory), Category = TestCategories.MsSql)]
[TestFixture(typeof(PostgreSqlEventStoreFactory), Category = TestCategories.PostgreSql)]
public class EventStoreTest<TEventStoreFactory> : FixtureBase where TEventStoreFactory : IEventStoreFactory, new()
{
TEventStoreFactory _eventStoreFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using d60.Cirqus.Events;
using d60.Cirqus.PostgreSql;
using d60.Cirqus.Tests.MsSql;

namespace d60.Cirqus.Tests.Contracts.EventStore.Factories
{
public class PostgreSqlEventStoreFactory : IEventStoreFactory
{
readonly PostgreSqlEventStore _eventStore;

public PostgreSqlEventStoreFactory()
{
var connectionString = TestSqlHelper.PostgreSqlConnectionString;

_eventStore = new PostgreSqlEventStore(connectionString, "Events");
}

public IEventStore GetEventStore()
{
return _eventStore;
}
}
}
Loading

0 comments on commit 252e6de

Please sign in to comment.