diff --git a/src/Changes/Changes.php b/src/Changes/Changes.php index f9182285..9bd2d1ab 100644 --- a/src/Changes/Changes.php +++ b/src/Changes/Changes.php @@ -50,22 +50,22 @@ class Changes implements ChangesInterface { /** * {@inheritdoc} */ - static public function createInstance(ContainerInterface $container, SequenceIndexInterface $sequence_index, WorkspaceInterface $workspace) { + static public function createInstance(ContainerInterface $container, WorkspaceInterface $workspace) { return new static( - $sequence_index, $workspace, + $container->get('entity.index.sequence'), $container->get('entity.manager'), $container->get('serializer') ); } /** - * @param \Drupal\multiversion\Entity\Index\SequenceIndex $sequenceIndex * @param \Drupal\multiversion\Entity\WorkspaceInterface $workspace + * @param \Drupal\multiversion\Entity\Index\SequenceIndex $sequenceIndex * @param \Drupal\Core\Entity\EntityManagerInterface $entity_manager * @param \Symfony\Component\Serializer\SerializerInterface $serializer */ - public function __construct(SequenceIndex $sequenceIndex, WorkspaceInterface $workspace, EntityManagerInterface $entity_manager, SerializerInterface $serializer) { + public function __construct(WorkspaceInterface $workspace, SequenceIndex $sequenceIndex, EntityManagerInterface $entity_manager, SerializerInterface $serializer) { $this->sequenceIndex = $sequenceIndex; $this->workspaceId = $workspace->id(); $this->entityManager = $entity_manager; @@ -90,10 +90,10 @@ public function lastSeq($seq) { /** * {@inheritdoc} */ - public function getNormal() { + public function getChanges() { $sequences = $this->sequenceIndex ->useWorkspace($this->workspaceId) - ->getRange($this->lastSeq, NULL); + ->getRange($this->lastSeq, NULL, FALSE); // Format the result array. $changes = array(); @@ -134,15 +134,10 @@ public function getNormal() { /** * {@inheritdoc} */ - public function getLongpoll() { - $no_change = TRUE; - do { - $change = $this->sequenceIndex - ->useWorkspace($this->workspaceId) - ->getRange($this->lastSeq, NULL); - $no_change = empty($change) ? TRUE : FALSE; - } while ($no_change); - return $change; + public function hasChanged($seq) { + return (bool) $this->sequenceIndex + ->useWorkspace($this->workspaceId) + ->getRange($this->lastSeq, NULL, FALSE); } } diff --git a/src/Changes/ChangesInterface.php b/src/Changes/ChangesInterface.php index d18978a5..e23544df 100644 --- a/src/Changes/ChangesInterface.php +++ b/src/Changes/ChangesInterface.php @@ -17,11 +17,10 @@ interface ChangesInterface { * Factory method. * * @param \Symfony\Component\DependencyInjection\ContainerInterface $container - * @param \Drupal\multiversion\Entity\Index\SequenceIndexInterface $sequence_index * @param \Drupal\multiversion\Entity\WorkspaceInterface * @return \Drupal\relaxed\Changes\ChangesInterface */ - static public function createInstance(ContainerInterface $container, SequenceIndexInterface $sequence_index, WorkspaceInterface $workspace); + static public function createInstance(ContainerInterface $container, WorkspaceInterface $workspace); /** * @param boolean $include_docs @@ -40,15 +39,16 @@ public function lastSeq($seq); /** * Return the changes in a 'normal' way. */ - public function getNormal(); + public function getChanges(); /** - * Return the changes with a 'longpoll'. + * Whether or not there's a change since the given sequence number. * - * We can implement this method later. + * @param int $timeout + * @return boolean * * @see https://www.drupal.org/node/2282295 */ - public function getLongpoll(); + public function hasChanged($timeout); } diff --git a/src/Controller/ResourceController.php b/src/Controller/ResourceController.php index 64e057e5..258f09d8 100644 --- a/src/Controller/ResourceController.php +++ b/src/Controller/ResourceController.php @@ -15,6 +15,7 @@ use Symfony\Component\DependencyInjection\ContainerAwareTrait; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; +use Symfony\Component\HttpFoundation\StreamedResponse; use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException; use Symfony\Component\HttpKernel\Exception\ConflictHttpException; use Symfony\Component\HttpKernel\Exception\HttpExceptionInterface; @@ -238,7 +239,7 @@ public function handle(Request $request) { foreach ($responses as $response_part) { try { - if ($response_data = $response_part->getResponseData()) { + if (!$response instanceof StreamedResponse && $response_data = $response_part->getResponseData()) { // Collect bubbleable metadata in a render context. $render_context = new RenderContext(); $response_output = $this->container->get('renderer')->executeInRenderContext($render_context, function() use ($serializer, $response_data, $response_format, $context) { @@ -258,9 +259,11 @@ public function handle(Request $request) { } } - foreach ($render_contexts as $render_context) { - $response->addCacheableDependency($render_context); + // Don't add cache dependencies if we're streaming the response. + if ($response instanceof StreamedResponse) { + return $response; } + foreach ($parameters as $parameter) { if (is_array($parameter)) { foreach ($parameter as $item) { diff --git a/src/Normalizer/ChangesNormalizer.php b/src/Normalizer/ChangesNormalizer.php index cb6a4747..4b7bf6d3 100644 --- a/src/Normalizer/ChangesNormalizer.php +++ b/src/Normalizer/ChangesNormalizer.php @@ -23,7 +23,7 @@ class ChangesNormalizer extends NormalizerBase { */ public function normalize($changes, $format = NULL, array $context = array()) { /** @var \Drupal\relaxed\Changes\ChangesInterface $changes */ - $results = $changes->getNormal(); + $results = $changes->getChanges(); $last_result = end($results); $last_seq = isset($last_result['seq']) ? $last_result['seq'] : 0; diff --git a/src/Plugin/rest/resource/ChangesResource.php b/src/Plugin/rest/resource/ChangesResource.php index ae4abed3..a24d9915 100644 --- a/src/Plugin/rest/resource/ChangesResource.php +++ b/src/Plugin/rest/resource/ChangesResource.php @@ -10,6 +10,7 @@ use Drupal\relaxed\Changes\Changes; use Drupal\rest\ResourceResponse; use Symfony\Component\HttpFoundation\Request; +use Symfony\Component\HttpFoundation\StreamedResponse; use Symfony\Component\HttpKernel\Exception\NotFoundHttpException; /** @@ -27,6 +28,16 @@ */ class ChangesResource extends ResourceBase { + /** + * @var int + */ + protected $heartbeat; + + /** + * @var int + */ + protected $lastHeartbeat; + public function get($workspace) { if (is_string($workspace)) { throw new NotFoundHttpException(); @@ -35,16 +46,82 @@ public function get($workspace) { // @todo: {@link https://www.drupal.org/node/2599930 Use injected container instead.} $changes = Changes::createInstance( \Drupal::getContainer(), - \Drupal::service('entity.index.sequence'), $workspace ); + /** @var \Symfony\Component\Serializer\SerializerInterface $serializer */ + $serializer = \Drupal::service('serializer'); $request = Request::createFromGlobals(); + if ($request->query->get('include_docs') == 'true') { $changes->includeDocs(TRUE); } + $last_seq = (int) $request->query->get('since', 0); + $changes->lastSeq($last_seq); + $timeout = (int) $request->query->get('timeout', 10000) * 1000; + $this->heartbeat = (int) $request->query->get('heartbeat', 10000) * 1000; + $this->lastHeartbeat = $start = $this->now(); + + switch ($request->query->get('feed', 'normal')) { + case 'continuous'; + $response = new StreamedResponse(); + $response->setCallback(function () use ($changes, $serializer, $start, $timeout, $last_seq) { + do { + foreach ($changes->getChanges() as $data) { + echo $serializer->serialize($data, 'json') . "\r\n"; + $this->flush(); + $last_seq = $data['seq']; + } + $changes->lastSeq($last_seq); + echo $this->heartbeat(); + $this->flush(); + } while ($this->now() < ($start + $timeout)); + echo $serializer->serialize(['last_seq' => $last_seq], 'json') . "\r\n"; + }); + break; - return new ResourceResponse($changes, 200); + case 'longpoll': + $response = new StreamedResponse(); + $response->setCallback(function () use ($changes, $serializer, $start, $timeout, $last_seq) { + do { + $changed = $changes->hasChanged($last_seq); + echo $this->heartbeat(); + $this->flush(); + } while (!$changed && $this->now() < ($start + $timeout)); + echo $serializer->serialize($changes, 'json') . "\r\n"; + }); + break; + + default: + $response = new ResourceResponse($changes, 200); + break; + } + return $response; + } + + /** + * Helper method returning the current time in microseconds. + * + * @return int + */ + protected function now() { + return (int) microtime(TRUE) * 1000000; + } + + /** + * Helper method flushing content. + */ + protected function flush() { + ob_flush(); + flush(); + $this->lastHeartbeat = $this->now(); + } + + protected function heartbeat() { + if ($this->heartbeat < ($this->now() - $this->lastHeartbeat)) { + $this->lastHeartbeat = $this->now(); + return "\r\n"; + } } }