-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathSyncIndexWithObjectChangeListener.php
90 lines (75 loc) · 2.55 KB
/
SyncIndexWithObjectChangeListener.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<?php
namespace Enqueue\ElasticaBundle\Doctrine;
use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
use Enqueue\ElasticaBundle\Doctrine\Queue\Commands;
use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor;
use Enqueue\Util\JSON;
use Interop\Queue\Context;
use Doctrine\Common\EventSubscriber;
final class SyncIndexWithObjectChangeListener implements EventSubscriber
{
private $context;
/**
* @var string
*/
private $modelClass;
/**
* @var array
*/
private $config;
public function __construct(Context $context, $modelClass, array $config)
{
$this->context = $context;
$this->modelClass = $modelClass;
$this->config = $config;
}
public function postUpdate(LifecycleEventArgs $args)
{
if ($args->getObject() instanceof $this->modelClass) {
$this->sendUpdateIndexMessage(SyncProcessor::UPDATE_ACTION, $args);
}
}
public function postPersist(LifecycleEventArgs $args)
{
if ($args->getObject() instanceof $this->modelClass) {
$this->sendUpdateIndexMessage(SyncProcessor::INSERT_ACTION, $args);
}
}
public function preRemove(LifecycleEventArgs $args)
{
if ($args->getObject() instanceof $this->modelClass) {
$this->sendUpdateIndexMessage(SyncProcessor::REMOVE_ACTION, $args);
}
}
public function getSubscribedEvents()
{
return [
'postPersist',
'postUpdate',
'preRemove',
];
}
/**
* @param string $action
* @param LifecycleEventArgs $args
*/
private function sendUpdateIndexMessage($action, LifecycleEventArgs $args)
{
$object = $args->getObject();
$rp = new \ReflectionProperty($object, $this->config['model_id']);
$rp->setAccessible(true);
$id = $rp->getValue($object);
$rp->setAccessible(false);
$queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_OBJECT_CHANGE);
$message = $this->context->createMessage(JSON::encode([
'action' => $action,
'model_class' => $this->modelClass,
'model_id' => $this->config['model_id'],
'id' => $id,
'index_name' => $this->config['index_name'],
'type_name' => $this->config['type_name'],
'repository_method' => $this->config['repository_method'],
]));
$this->context->createProducer()->setDeliveryDelay($this->config['delivery_delay'])->send($queue, $message);
}
}