Skip to content

Commit

Permalink
init event-sourcing
Browse files Browse the repository at this point in the history
  • Loading branch information
wachterjohannes committed Feb 6, 2018
1 parent 4151eb7 commit e481a53
Show file tree
Hide file tree
Showing 27 changed files with 954 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Builder/ArticleIndexBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ private function buildForManager(Manager $manager, $destroy)
}

$this->output->writeln(sprintf('Drop and create index for "<comment>%s</comment>" manager.', $name));
$manager->dropAndCreateIndex();
// $manager->dropAndCreateIndex();
}
}
186 changes: 118 additions & 68 deletions Controller/ArticleController.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,40 @@
use ONGR\ElasticsearchDSL\Query\TermLevel\RangeQuery;
use ONGR\ElasticsearchDSL\Query\TermLevel\TermQuery;
use ONGR\ElasticsearchDSL\Sort\FieldSort;
use Prooph\Common\Event\ProophActionEventEmitter;
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\Pdo\MySqlEventStore;
use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlAggregateStreamStrategy;
use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
use Prooph\EventStoreBusBridge\EventPublisher;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\SnapshotStore\Pdo\PdoSnapshotStore;
use Ramsey\Uuid\Uuid;
use Sulu\Bundle\ArticleBundle\Admin\ArticleAdmin;
use Sulu\Bundle\ArticleBundle\Document\ArticleDocument;
use Sulu\Bundle\ArticleBundle\ListBuilder\ElasticSearchFieldDescriptor;
use Sulu\Bundle\ArticleBundle\Metadata\ArticleViewDocumentIdTrait;
use Sulu\Component\Content\Form\Exception\InvalidFormException;
use Sulu\Bundle\ArticleBundle\Prooph\Infrastruture\ArticleRepository;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\CreateArticle;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\CreateArticleHandler;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\PublishArticle;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\PublishArticleHandler;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\UnpublishArticle;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\UnpublishArticleHandler;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\UpdateArticle;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Command\UpdateArticleHandler;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Event\ArticleCreated;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Event\ArticlePublished;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Event\ArticleUnpublished;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Event\ArticleUpdated;
use Sulu\Bundle\ArticleBundle\Prooph\Projection\ArticleDocumentProjector;
use Sulu\Component\Content\Mapper\ContentMapperInterface;
use Sulu\Component\DocumentManager\DocumentManagerInterface;
use Sulu\Component\DocumentManager\Metadata\BaseMetadataFactory;
use Sulu\Component\Rest\Exception\MissingParameterException;
use Sulu\Component\Rest\Exception\RestException;
use Sulu\Component\Rest\ListBuilder\ListRepresentation;
use Sulu\Component\Rest\RequestParametersTrait;
Expand Down Expand Up @@ -279,14 +304,24 @@ public function getAction($uuid, Request $request)
*/
public function postAction(Request $request)
{
$action = $request->get('action');
$document = $this->getDocumentManager()->create(self::DOCUMENT_TYPE);
$id = Uuid::uuid4()->toString();

$locale = $this->getRequestParameter($request, 'locale', true);
$data = $request->request->all();
$this->getCommandBus()->dispatch(
new CreateArticle(
[
'id' => $id,
'locale' => $locale,
'data' => $data,
'userId' => $this->getUser()->getId(),
]
)
);

$this->persistDocument($data, $document, $locale);
$this->handleActionParameter($action, $document, $locale);
$this->getDocumentManager()->flush();
$this->handleActionParameter($id, $request->get('action'), $locale);

$document = $this->getDocumentManager()->find($id, $locale);

return $this->handleView(
$this->view($document)->setSerializationContext(
Expand Down Expand Up @@ -322,9 +357,20 @@ public function putAction(Request $request, $uuid)

$this->get('sulu_hash.request_hash_checker')->checkHash($request, $document, $document->getUuid());

$this->persistDocument($data, $document, $locale);
$this->handleActionParameter($action, $document, $locale);
$this->getDocumentManager()->flush();
$this->getCommandBus()->dispatch(
new UpdateArticle(
[
'id' => $uuid,
'locale' => $locale,
'data' => $data,
'userId' => $this->getUser()->getId(),
]
)
);

$this->handleActionParameter($uuid, $action, $locale);

$document = $this->getDocumentManager()->find($uuid, $locale);

return $this->handleView(
$this->view($document)->setSerializationContext(
Expand Down Expand Up @@ -378,7 +424,7 @@ public function deleteAction($id)
*
* @Post("/articles/{uuid}")
*
* @param string $uuid
* @param string $uuid
* @param Request $request
*
* @return Response
Expand All @@ -397,9 +443,15 @@ public function postTriggerAction($uuid, Request $request)
try {
switch ($action) {
case 'unpublish':
$document = $this->getDocumentManager()->find($uuid, $locale);
$this->getDocumentManager()->unpublish($document, $locale);
$this->getDocumentManager()->flush();
$this->getCommandBus()->dispatch(
new UnpublishArticle(
[
'id' => $uuid,
'locale' => $locale,
'userId' => $this->getUser()->getId(),
]
)
);

$data = $this->getDocumentManager()->find($uuid, $locale);

Expand Down Expand Up @@ -486,47 +538,6 @@ public function getSecurityContext()
return ArticleAdmin::SECURITY_CONTEXT;
}

/**
* Persists the document using the given information.
*
* @param array $data
* @param object $document
* @param string $locale
*
* @throws InvalidFormException
* @throws MissingParameterException
*/
private function persistDocument($data, $document, $locale)
{
$formType = $this->getMetadataFactory()->getMetadataForAlias('article')->getFormType();
$form = $this->createForm(
$formType,
$document,
[
// disable csrf protection, since we can't produce a token, because the form is cached on the client
'csrf_protection' => false,
]
);
$form->submit($data, false);

if (!$form->isValid()) {
throw new InvalidFormException($form);
}

if (array_key_exists('author', $data) && null === $data['author']) {
$document->setAuthor(null);
}

$this->getDocumentManager()->persist(
$document,
$locale,
[
'user' => $this->getUser()->getId(),
'clear_missing_content' => false,
]
);
}

/**
* Returns document-manager.
*
Expand All @@ -545,20 +556,18 @@ protected function getMapper()
return $this->get('sulu.content.mapper');
}

/**
* Delegates actions by given actionParameter, which can be retrieved from the request.
*
* @param string $actionParameter
* @param object $document
* @param string $locale
*/
private function handleActionParameter($actionParameter, $document, $locale)
private function handleActionParameter(string $id, ?string $action, string $locale)
{
switch ($actionParameter) {
case 'publish':
$this->getDocumentManager()->publish($document, $locale);

break;
if ($action === 'publish') {
$this->getCommandBus()->dispatch(
new PublishArticle(
[
'id' => $id,
'locale' => $locale,
'userId' => $this->getUser()->getId(),
]
)
);
}
}

Expand Down Expand Up @@ -604,4 +613,45 @@ protected function getMetadataFactory()
{
return $this->get('sulu_document_manager.metadata_factory.base');
}

protected function getCommandBus()
{
$pdo = new \PDO('mysql:dbname=su_article_prooph;host=127.0.0.1', 'root', '');
$eventStore = new MySqlEventStore(new FQCNMessageFactory(), $pdo, new MySqlAggregateStreamStrategy());
$eventEmitter = new ProophActionEventEmitter();
$eventStore = new ActionEventEmitterEventStore($eventStore, $eventEmitter);

$eventBus = new EventBus($eventEmitter);
$eventPublisher = new EventPublisher($eventBus);
$eventPublisher->attachToEventStore($eventStore);

$pdoSnapshotStore = new PdoSnapshotStore($pdo);
$userRepository = new ArticleRepository($eventStore, $pdoSnapshotStore);

$projectionManager = new MySqlProjectionManager($eventStore, $pdo);

$commandBus = new CommandBus();
$router = new CommandRouter();
$router->route(CreateArticle::class)->to(
new CreateArticleHandler($userRepository, $this->get('sulu_content.structure.factory'))
);
$router->route(PublishArticle::class)->to(new PublishArticleHandler($userRepository));
$router->route(UnpublishArticle::class)->to(new UnpublishArticleHandler($userRepository));
$router->route(UpdateArticle::class)->to(
new UpdateArticleHandler($userRepository, $this->get('sulu_content.structure.factory'))
);
$router->attachToMessageBus($commandBus);

$userProjector = new ArticleDocumentProjector(
$this->getDocumentManager(), $this->getMetadataFactory(), $this->get('form.factory')
);
$eventRouter = new EventRouter();
$eventRouter->route(ArticleCreated::class)->to([$userProjector, 'onArticleCreated']);
$eventRouter->route(ArticlePublished::class)->to([$userProjector, 'onArticlePublished']);
$eventRouter->route(ArticleUnpublished::class)->to([$userProjector, 'onArticleUnpublished']);
$eventRouter->route(ArticleUpdated::class)->to([$userProjector, 'onArticleUpdated']);
$eventRouter->attachToMessageBus($eventBus);

return $commandBus;
}
}
12 changes: 12 additions & 0 deletions Document/Subscriber/ArticleSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static function getSubscribedEvents()
['hydratePageData', -2000],
],
Events::PERSIST => [
['handleUuid', 480],
['handleScheduleIndex', -500],
['setChildrenStructureType', 0],
['persistPageData', -2000],
Expand All @@ -134,6 +135,17 @@ public static function getSubscribedEvents()
];
}

public function handleUuid(PersistEvent $event)
{
$document = $event->getDocument();
if (!$document instanceof ArticleDocument) {
return;
}

$event->getNode()->addMixin('mix:referenceable');
$event->getNode()->setProperty('jcr:uuid', $document->getUuid());
}

/**
* Schedule article document for index.
*
Expand Down
38 changes: 38 additions & 0 deletions Prooph/Infrastruture/ArticleRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Sulu\Bundle\ArticleBundle\Prooph\Infrastruture;

use Prooph\EventSourcing\Aggregate\AggregateRepository;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\SnapshotStore\SnapshotStore;
use Sulu\Bundle\ArticleBundle\Prooph\Model\ArticleRepository as BaseArticleRepository;
use Sulu\Bundle\ArticleBundle\Prooph\Model\Article;

class ArticleRepository extends AggregateRepository implements BaseArticleRepository
{
public function __construct(EventStore $eventStore, SnapshotStore $snapshotStore)
{
parent::__construct(
$eventStore,
AggregateType::fromAggregateRootClass(Article::class),
new AggregateTranslator(),
$snapshotStore,
null,
true
);
}

public function save(Article $article): void
{
$this->saveAggregateRoot($article);
}

public function get(string $id): ?Article
{
return $this->getAggregateRoot($id);
}
}
Loading

0 comments on commit e481a53

Please sign in to comment.