Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions src/Changes/Changes.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ class Changes implements ChangesInterface {
/**
* {@inheritdoc}
*/
static public function createInstance(ContainerInterface $container, SequenceIndexInterface $sequence_index, WorkspaceInterface $workspace) {
static public function createInstance(ContainerInterface $container, WorkspaceInterface $workspace) {
return new static(
$sequence_index,
$workspace,
$container->get('entity.index.sequence'),
$container->get('entity.manager'),
$container->get('serializer')
);
}

/**
* @param \Drupal\multiversion\Entity\Index\SequenceIndex $sequenceIndex
* @param \Drupal\multiversion\Entity\WorkspaceInterface $workspace
* @param \Drupal\multiversion\Entity\Index\SequenceIndex $sequenceIndex
* @param \Drupal\Core\Entity\EntityManagerInterface $entity_manager
* @param \Symfony\Component\Serializer\SerializerInterface $serializer
*/
public function __construct(SequenceIndex $sequenceIndex, WorkspaceInterface $workspace, EntityManagerInterface $entity_manager, SerializerInterface $serializer) {
public function __construct(WorkspaceInterface $workspace, SequenceIndex $sequenceIndex, EntityManagerInterface $entity_manager, SerializerInterface $serializer) {
$this->sequenceIndex = $sequenceIndex;
$this->workspaceId = $workspace->id();
$this->entityManager = $entity_manager;
Expand All @@ -90,10 +90,10 @@ public function lastSeq($seq) {
/**
* {@inheritdoc}
*/
public function getNormal() {
public function getChanges() {
$sequences = $this->sequenceIndex
->useWorkspace($this->workspaceId)
->getRange($this->lastSeq, NULL);
->getRange($this->lastSeq, NULL, FALSE);

// Format the result array.
$changes = array();
Expand Down Expand Up @@ -134,15 +134,10 @@ public function getNormal() {
/**
* {@inheritdoc}
*/
public function getLongpoll() {
$no_change = TRUE;
do {
$change = $this->sequenceIndex
->useWorkspace($this->workspaceId)
->getRange($this->lastSeq, NULL);
$no_change = empty($change) ? TRUE : FALSE;
} while ($no_change);
return $change;
public function hasChanged($seq) {
return (bool) $this->sequenceIndex
->useWorkspace($this->workspaceId)
->getRange($this->lastSeq, NULL, FALSE);
}

}
12 changes: 6 additions & 6 deletions src/Changes/ChangesInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ interface ChangesInterface {
* Factory method.
*
* @param \Symfony\Component\DependencyInjection\ContainerInterface $container
* @param \Drupal\multiversion\Entity\Index\SequenceIndexInterface $sequence_index
* @param \Drupal\multiversion\Entity\WorkspaceInterface
* @return \Drupal\relaxed\Changes\ChangesInterface
*/
static public function createInstance(ContainerInterface $container, SequenceIndexInterface $sequence_index, WorkspaceInterface $workspace);
static public function createInstance(ContainerInterface $container, WorkspaceInterface $workspace);

/**
* @param boolean $include_docs
Expand All @@ -40,15 +39,16 @@ public function lastSeq($seq);
/**
* Return the changes in a 'normal' way.
*/
public function getNormal();
public function getChanges();

/**
* Return the changes with a 'longpoll'.
* Whether or not there's a change since the given sequence number.
*
* We can implement this method later.
* @param int $timeout
* @return boolean
*
* @see https://www.drupal.org/node/2282295
*/
public function getLongpoll();
public function hasChanged($timeout);

}
9 changes: 6 additions & 3 deletions src/Controller/ResourceController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\ConflictHttpException;
use Symfony\Component\HttpKernel\Exception\HttpExceptionInterface;
Expand Down Expand Up @@ -238,7 +239,7 @@ public function handle(Request $request) {

foreach ($responses as $response_part) {
try {
if ($response_data = $response_part->getResponseData()) {
if (!$response instanceof StreamedResponse && $response_data = $response_part->getResponseData()) {
// Collect bubbleable metadata in a render context.
$render_context = new RenderContext();
$response_output = $this->container->get('renderer')->executeInRenderContext($render_context, function() use ($serializer, $response_data, $response_format, $context) {
Expand All @@ -258,9 +259,11 @@ public function handle(Request $request) {
}
}

foreach ($render_contexts as $render_context) {
$response->addCacheableDependency($render_context);
// Don't add cache dependencies if we're streaming the response.
if ($response instanceof StreamedResponse) {
return $response;
}

foreach ($parameters as $parameter) {
if (is_array($parameter)) {
foreach ($parameter as $item) {
Expand Down
2 changes: 1 addition & 1 deletion src/Normalizer/ChangesNormalizer.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ChangesNormalizer extends NormalizerBase {
*/
public function normalize($changes, $format = NULL, array $context = array()) {
/** @var \Drupal\relaxed\Changes\ChangesInterface $changes */
$results = $changes->getNormal();
$results = $changes->getChanges();
$last_result = end($results);
$last_seq = isset($last_result['seq']) ? $last_result['seq'] : 0;

Expand Down
81 changes: 79 additions & 2 deletions src/Plugin/rest/resource/ChangesResource.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Drupal\relaxed\Changes\Changes;
use Drupal\rest\ResourceResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Symfony\Component\HttpKernel\Exception\NotFoundHttpException;

/**
Expand All @@ -27,6 +28,16 @@
*/
class ChangesResource extends ResourceBase {

/**
* @var int
*/
protected $heartbeat;

/**
* @var int
*/
protected $lastHeartbeat;

public function get($workspace) {
if (is_string($workspace)) {
throw new NotFoundHttpException();
Expand All @@ -35,16 +46,82 @@ public function get($workspace) {
// @todo: {@link https://www.drupal.org/node/2599930 Use injected container instead.}
$changes = Changes::createInstance(
\Drupal::getContainer(),
\Drupal::service('entity.index.sequence'),
$workspace
);

/** @var \Symfony\Component\Serializer\SerializerInterface $serializer */
$serializer = \Drupal::service('serializer');
$request = Request::createFromGlobals();

if ($request->query->get('include_docs') == 'true') {
$changes->includeDocs(TRUE);
}
$last_seq = (int) $request->query->get('since', 0);
$changes->lastSeq($last_seq);
$timeout = (int) $request->query->get('timeout', 10000) * 1000;
$this->heartbeat = (int) $request->query->get('heartbeat', 10000) * 1000;
$this->lastHeartbeat = $start = $this->now();

switch ($request->query->get('feed', 'normal')) {
case 'continuous';
$response = new StreamedResponse();
$response->setCallback(function () use ($changes, $serializer, $start, $timeout, $last_seq) {
do {
foreach ($changes->getChanges() as $data) {
echo $serializer->serialize($data, 'json') . "\r\n";
$this->flush();
$last_seq = $data['seq'];
}
$changes->lastSeq($last_seq);
echo $this->heartbeat();
$this->flush();
} while ($this->now() < ($start + $timeout));
echo $serializer->serialize(['last_seq' => $last_seq], 'json') . "\r\n";
});
break;

return new ResourceResponse($changes, 200);
case 'longpoll':
$response = new StreamedResponse();
$response->setCallback(function () use ($changes, $serializer, $start, $timeout, $last_seq) {
do {
$changed = $changes->hasChanged($last_seq);
echo $this->heartbeat();
$this->flush();
} while (!$changed && $this->now() < ($start + $timeout));
echo $serializer->serialize($changes, 'json') . "\r\n";
});
break;

default:
$response = new ResourceResponse($changes, 200);
break;
}
return $response;
}

/**
* Helper method returning the current time in microseconds.
*
* @return int
*/
protected function now() {
return (int) microtime(TRUE) * 1000000;
}

/**
* Helper method flushing content.
*/
protected function flush() {
ob_flush();
flush();
$this->lastHeartbeat = $this->now();
}

protected function heartbeat() {
if ($this->heartbeat < ($this->now() - $this->lastHeartbeat)) {
$this->lastHeartbeat = $this->now();
return "\r\n";
}
}

}