Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Clean up, removed event store from eventdispatcher initialize method.…
Browse files Browse the repository at this point in the history
… I kept mixing it up with the ctor injected one.
  • Loading branch information
asgerhallas committed Dec 18, 2015
1 parent 18b5377 commit 6a69178
Show file tree
Hide file tree
Showing 22 changed files with 52 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ void WaitResetEvent(int seconds = 5)
Console.WriteLine("done!");
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
_stuffThatHappened.Add(string.Format("Initialized with {0} (purge: {1})", eventStore, purgeExistingViews));
_stuffThatHappened.Add(string.Format("Initialized (purge: {0})", purgeExistingViews));
}

public void Dispatch(IEnumerable<DomainEvent> events)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected override void DoSetUp()
RegisterForDisposal(eventDispatcher);

eventDispatcher.AddViewManager(_viewManager);
eventDispatcher.Initialize(_eventStoreProxy);
eventDispatcher.Initialize();
}

protected override void DoTearDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public AzureServiceBusEventDispatcherReceiver(string connectionString, IEventDis

public void Initialize(bool purgeExistingViews = false)
{
_innerEventDispatcher.Initialize(_eventStore, purgeExistingViews: purgeExistingViews);
_innerEventDispatcher.Initialize(purgeExistingViews);

_workerThread.Start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AzureServiceBusEventDispatcherSender(string connectionString, string topi
_topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
// do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public AzureServiceBusRelayEventDispatcher(IEventStore eventStore, string servic
Dispose(false);
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
_logger.Info("Opening connection");
_serviceHost.Open();
Expand Down
7 changes: 5 additions & 2 deletions d60.Cirqus.Testing/SynchronousViewManagerEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ namespace d60.Cirqus.Testing
public class SynchronousViewManagerEventDispatcher : IEventDispatcher
{
readonly List<IViewManager> viewManagers;
readonly IEventStore _eventStore;
readonly IAggregateRootRepository _aggregateRootRepository;
readonly IDomainEventSerializer _domainEventSerializer;
readonly IDomainTypeNameMapper _domainTypeNameMapper;
readonly IDictionary<string, object> _viewContextItems = new Dictionary<string, object>();
readonly Logger _logger = CirqusLoggerFactory.Current.GetCurrentClassLogger();

public SynchronousViewManagerEventDispatcher(
IEventStore eventStore,
IAggregateRootRepository aggregateRootRepository,
IDomainEventSerializer domainEventSerializer,
IDomainTypeNameMapper domainTypeNameMapper,
params IViewManager[] viewManagers)
{
this.viewManagers = viewManagers.ToList();

_eventStore = eventStore;
_aggregateRootRepository = aggregateRootRepository;
_domainEventSerializer = domainEventSerializer;
_domainTypeNameMapper = domainTypeNameMapper;
Expand All @@ -48,9 +51,9 @@ public void SetContextItems(IDictionary<string, object> contextItems)
}
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
foreach (var batch in eventStore.Stream().Batch(1000))
foreach (var batch in _eventStore.Stream().Batch(1000))
{
Dispatch(batch.Select(e => _domainEventSerializer.Deserialize(e)));
}
Expand Down
1 change: 1 addition & 0 deletions d60.Cirqus.Testing/TestingConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static SynchronousViewManagerEventDispatcherConfiguationBuilder UseSynchr
context.AddChildContext(viewManagerContext);

var eventDispatcher = new SynchronousViewManagerEventDispatcher(
context.Get<IEventStore>(),
context.Get<IAggregateRootRepository>(),
context.Get<IDomainEventSerializer>(),
context.Get<IDomainTypeNameMapper>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public void NoProblemoWithRealSetup()

var commandProcessor = CommandProcessor.With()
.EventStore(e => e.Register<IEventStore>(c => eventStore))
.EventDispatcher(e => e.Register<IEventDispatcher>(c => new ConsoleOutEventDispatcher()))
.EventDispatcher(e => e.Register<IEventDispatcher>(c =>
new ConsoleOutEventDispatcher(eventStore)))
.Create();

RegisterForDisposal(commandProcessor);
Expand Down
3 changes: 2 additions & 1 deletion d60.Cirqus.Tests/Commands/TestCommandProcessing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ protected override void DoSetUp()

_cirqus = CommandProcessor.With()
.EventStore(e => _eventStore = e.UseInMemoryEventStore())
.EventDispatcher(e => e.UseEventDispatcher(c => new ConsoleOutEventDispatcher()))
.EventDispatcher(e => e.UseEventDispatcher(c =>
new ConsoleOutEventDispatcher(c.Get<IEventStore>())))
.Options(o =>
{
o.AddDomainExceptionType<InvalidOperationException>();
Expand Down
3 changes: 2 additions & 1 deletion d60.Cirqus.Tests/Commands/TestCommandTypeNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ protected override void DoSetUp()
{
_commandProcessor = CommandProcessor.With()
.EventStore(e => _eventStore = e.UseInMemoryEventStore())
.EventDispatcher(e => e.UseEventDispatcher(c => new ConsoleOutEventDispatcher()))
.EventDispatcher(e => e.UseEventDispatcher(c =>
new ConsoleOutEventDispatcher(c.Get<IEventStore>())))
.Options(o =>
{
o.AddCommandTypeNameToMetadata();
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus.Tests/Extensions/Helpful.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal static Task<InMemoryEventStore> UseInMemoryEventStore(this EventStoreCo

internal static void UseConsoleOutEventDispatcher(this EventDispatcherConfigurationBuilder builder)
{
builder.RegisterInstance<IEventDispatcher>(new ConsoleOutEventDispatcher());
builder.UseEventDispatcher(c => new ConsoleOutEventDispatcher(c.Get<IEventStore>()));
}
}
}
11 changes: 9 additions & 2 deletions d60.Cirqus.Tests/Stubs/ConsoleOutEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ namespace d60.Cirqus.Tests.Stubs
{
public class ConsoleOutEventDispatcher : IEventDispatcher
{
public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
readonly IEventStore _eventStore;

public ConsoleOutEventDispatcher(IEventStore eventStore)
{
_eventStore = eventStore;
}

public void Initialize(bool purgeExistingViews = false)
{
Console.WriteLine("Ignoring {0} events", eventStore.Stream().Count());
Console.WriteLine("Ignoring {0} events", _eventStore.Stream().Count());
}

public void Dispatch(IEnumerable<DomainEvent> events)
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CommandProcessor(
public CommandProcessor Initialize()
{
_logger.Info("Initializing event dispatcher");
_eventDispatcher.Initialize(_eventStore, Options.PurgeExistingViews);
_eventDispatcher.Initialize(Options.PurgeExistingViews);
return this;
}

Expand Down
4 changes: 2 additions & 2 deletions d60.Cirqus/Diagnostics/DiagnosticsConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public EventDispatcherDecorator(IEventDispatcher innerEventDispatcher, Operation
_operationProfiler = operationProfiler;
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
_innerEventDispatcher.Initialize(eventStore, purgeExistingViews);
_innerEventDispatcher.Initialize(purgeExistingViews);
}

public void Dispatch(IEnumerable<DomainEvent> events)
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus/Testing/TestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void Initialize()
{
if (!_initialized)
{
_eventDispatcher.Initialize(_eventStore, purgeExistingViews: true);
_eventDispatcher.Initialize(purgeExistingViews: true);
_initialized = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ public void Register(ViewManagerEventDispatcher eventDispatcher)
_viewManagers.AddRange(_eventDispatcher.GetViewManagers());
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
if (_eventDispatcher == null)
{
throw new InvalidOperationException("Attempted to initialize AutoDistributionViewManagerEventDispatcher but no ViewManagerEventDispatcherWasRegistered");
}

_eventDispatcher.Initialize(eventStore, purgeExistingViews);
_eventDispatcher.Initialize(purgeExistingViews);

SignOn();

Expand Down
4 changes: 2 additions & 2 deletions d60.Cirqus/Views/CompositeEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public CompositeEventDispatcher(IEnumerable<IEventDispatcher> eventDispatchers)
_eventDispatchers = eventDispatchers.ToList();
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
_eventDispatchers.ForEach(d => d.Initialize(eventStore, purgeExistingViews));
_eventDispatchers.ForEach(d => d.Initialize(purgeExistingViews));
}

public void Dispatch(IEnumerable<DomainEvent> events)
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus/Views/DependentViewManagerEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ static long GetPosition(List<IViewManager> viewManagers, long defaultValue)
return sequenceNumberToCatchUpTo;
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
_work.Enqueue(new Work());
_workerThread.Start();
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus/Views/IEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface IEventDispatcher
/// Will be called at startup, before new events will be dispatched. Allows for the dispatcher to
/// catch up if it feels like it.
/// </summary>
void Initialize(IEventStore eventStore, bool purgeExistingViews = false);
void Initialize(bool purgeExistingViews = false);

/// <summary>
/// Will be called with new events after each unit of work has been successfully committed to the event store
Expand Down
16 changes: 11 additions & 5 deletions d60.Cirqus/Views/InMemoryViewEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace d60.Cirqus.Views
/// </summary>
public class InMemoryViewEventDispatcher<TViewInstance> : IEventDispatcher where TViewInstance : class, IViewInstance, ISubscribeTo, new()
{
private readonly IEventStore _eventStore;
readonly IAggregateRootRepository _aggregateRootRepository;
readonly IDomainEventSerializer _domainEventSerializer;
readonly IDomainTypeNameMapper _domainTypeNameMapper;
Expand All @@ -28,8 +29,13 @@ namespace d60.Cirqus.Views

bool _stopped;

public InMemoryViewEventDispatcher(IAggregateRootRepository aggregateRootRepository, IDomainEventSerializer domainEventSerializer, IDomainTypeNameMapper domainTypeNameMapper)
public InMemoryViewEventDispatcher(
IEventStore eventStore,
IAggregateRootRepository aggregateRootRepository,
IDomainEventSerializer domainEventSerializer,
IDomainTypeNameMapper domainTypeNameMapper)
{
_eventStore = eventStore;
_aggregateRootRepository = aggregateRootRepository;
_domainEventSerializer = domainEventSerializer;
_domainTypeNameMapper = domainTypeNameMapper;
Expand All @@ -41,7 +47,7 @@ public InMemoryViewEventDispatcher(IAggregateRootRepository aggregateRootReposit
/// </summary>
public bool SkipInitialization { get; set; }

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
if (SkipInitialization)
{
Expand All @@ -51,19 +57,19 @@ public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)

_logger.Info("Initializing in-mem view event dispatcher for {0}", typeof (TViewInstance));

foreach (var batch in eventStore.Stream().Batch(1000))
foreach (var batch in _eventStore.Stream().Batch(1000))
{
if (_stopped)
{
_logger.Warn("Event processing stopped during initialization... that was a bad start!");
return;
}

Dispatch(eventStore, batch);
Dispatch(batch);
}
}

void Dispatch(IEventStore eventStore, IEnumerable<EventData> events)
void Dispatch(IEnumerable<EventData> events)
{
Dispatch(events.Select(e => _domainEventSerializer.Deserialize(e)));
}
Expand Down
2 changes: 1 addition & 1 deletion d60.Cirqus/Views/NullEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace d60.Cirqus.Views
/// </summary>
class NullEventDispatcher : IEventDispatcher
{
public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
}

Expand Down
5 changes: 2 additions & 3 deletions d60.Cirqus/Views/ViewManagerEventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,12 @@ public void RemoveViewManager(IViewManager viewManager)
}
}

public void Initialize(IEventStore eventStore, bool purgeExistingViews = false)
public void Initialize(bool purgeExistingViews = false)
{
if (eventStore == null) throw new ArgumentNullException("eventStore");
_logger.Info("Initializing event dispatcher with view managers: {0}", string.Join(", ", _viewManagers));

_logger.Debug("Initiating immediate full catchup");
_work.Enqueue(PieceOfWork.FullCatchUp(purgeExistingViews: purgeExistingViews));
_work.Enqueue(PieceOfWork.FullCatchUp(purgeExistingViews));

_logger.Debug("Starting automatic catchup timer with {0} ms interval", _automaticCatchUpTimer.Interval);
_automaticCatchUpTimer.Start();
Expand Down

0 comments on commit 6a69178

Please sign in to comment.