Skip to content

Commit 002547a

Browse files
committed
Added RabbitMQ Extractor and Loader
1 parent 462a970 commit 002547a

File tree

2 files changed

+163
-0
lines changed

2 files changed

+163
-0
lines changed

src/Extractor.php

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Kiboko\Component\Flow\RabbitMQ;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Kiboko\Component\Bucket\AcceptanceResultBucket;
8+
use Kiboko\Contract\Pipeline\ExtractorInterface;
9+
10+
final class Extractor implements ExtractorInterface
11+
{
12+
private Channel $channel;
13+
14+
public function __construct(
15+
private Client $connection,
16+
private string $topic,
17+
) {
18+
$this->channel = $this->connection->channel();
19+
20+
$this->channel->queueDeclare(
21+
queue: $this->topic,
22+
passive: false,
23+
durable: true,
24+
exclusive: false,
25+
autoDelete: true,
26+
);
27+
}
28+
29+
public static function withoutAuthentication(
30+
string $host,
31+
string $vhost,
32+
string $topic,
33+
?int $port = null,
34+
): self {
35+
$connection = new Client([
36+
'host' => $host,
37+
'port' => $port,
38+
'vhost' => $vhost,
39+
'user' => 'guest',
40+
'password' => 'guest',
41+
]);
42+
$connection->connect();
43+
44+
return new self($connection, topic: $topic);
45+
}
46+
47+
public static function withAuthentication(
48+
string $host,
49+
string $vhost,
50+
string $topic,
51+
?string $user,
52+
?string $password,
53+
?int $port = null,
54+
): self {
55+
$connection = new Client([
56+
'host' => $host,
57+
'port' => $port,
58+
'vhost' => $vhost,
59+
'user' => $user,
60+
'password' => $password,
61+
]);
62+
$connection->connect();
63+
64+
return new self($connection, topic: $topic);
65+
}
66+
67+
public function extract(): iterable
68+
{
69+
while (true) {
70+
$message = $this->channel->get($this->topic);
71+
$this->channel->ack($message);
72+
73+
yield new AcceptanceResultBucket($message);
74+
}
75+
}
76+
}

src/Loader.php

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Kiboko\Component\Flow\RabbitMQ;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Kiboko\Component\Bucket\AcceptanceResultBucket;
8+
use Kiboko\Contract\Pipeline\LoaderInterface;
9+
10+
final class Loader implements LoaderInterface
11+
{
12+
private Channel $channel;
13+
14+
public function __construct(
15+
private Client $connection,
16+
private string $topic,
17+
private ?string $exchange = null,
18+
) {
19+
$this->channel = $this->connection->channel();
20+
21+
$this->channel->queueDeclare(
22+
queue: $this->topic,
23+
passive: false,
24+
durable: true,
25+
exclusive: false,
26+
autoDelete: true,
27+
);
28+
}
29+
30+
public static function withoutAuthentication(
31+
string $host,
32+
string $vhost,
33+
string $topic,
34+
?string $exchange = null,
35+
?int $port = null,
36+
): self {
37+
$connection = new Client([
38+
'host' => $host,
39+
'port' => $port,
40+
'vhost' => $vhost,
41+
'user' => 'guest',
42+
'password' => 'guest',
43+
]);
44+
$connection->connect();
45+
46+
return new self($connection, topic: $topic, exchange: $exchange);
47+
}
48+
49+
public static function withAuthentication(
50+
string $host,
51+
string $vhost,
52+
string $topic,
53+
?string $user,
54+
?string $password,
55+
?string $exchange = null,
56+
?int $port = null,
57+
): self {
58+
$connection = new Client([
59+
'host' => $host,
60+
'port' => $port,
61+
'vhost' => $vhost,
62+
'user' => $user,
63+
'password' => $password,
64+
]);
65+
$connection->connect();
66+
67+
return new self($connection, topic: $topic, exchange: $exchange);
68+
}
69+
70+
public function load(): \Generator
71+
{
72+
$line = yield;
73+
74+
while (true) {
75+
$this->channel->publish(
76+
\json_encode($line, JSON_THROW_ON_ERROR),
77+
[
78+
'content-type' => 'application/json',
79+
],
80+
$this->topic,
81+
$this->exchange,
82+
);
83+
84+
$line = yield new AcceptanceResultBucket($line);
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)