Skip to content

Commit 0c0d757

Browse files
committed
feature: add project
1 parent 6c3dc0f commit 0c0d757

15 files changed

+422
-1
lines changed

README.md

+89-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,89 @@
1-
# sync-postgres-with-elasticsearch-example
1+
# Sync PostgreSQL with Elasticsearch via Debezium
2+
3+
### Schema
4+
5+
```
6+
+-------------+
7+
| |
8+
| PostgreSQL |
9+
| |
10+
+------+------+
11+
|
12+
|
13+
|
14+
+---------------v------------------+
15+
| |
16+
| Kafka Connect |
17+
| (Debezium, ES connectors) |
18+
| |
19+
+---------------+------------------+
20+
|
21+
|
22+
|
23+
|
24+
+-------v--------+
25+
| |
26+
| Elasticsearch |
27+
| |
28+
+----------------+
29+
30+
31+
```
32+
We are using Docker Compose to deploy the following components:
33+
34+
* PostgreSQL
35+
* Kafka
36+
* ZooKeeper
37+
* Kafka Broker
38+
* Kafka Connect with [Debezium](http://debezium.io/) and [Elasticsearch](https://github.com/confluentinc/kafka-connect-elasticsearch) Connectors
39+
* Elasticsearch
40+
41+
### Usage
42+
43+
```shell
44+
docker-compose up --build
45+
46+
# wait until it's setup
47+
./start.sh
48+
```
49+
50+
### Testing
51+
52+
Check database's content
53+
54+
```shell
55+
# Check contents of the PostgreSQL database:
56+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE -c "SELECT * FROM users"'
57+
58+
# Check contents of the Elasticsearch database:
59+
curl http://localhost:9200/users/_search?pretty
60+
```
61+
62+
Create user
63+
64+
```shell
65+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE'
66+
test_db=# INSERT INTO users (email) VALUES ('[email protected]');
67+
68+
# Check contents of the Elasticsearch database:
69+
curl http://localhost:9200/users/_search?q=id:6
70+
```
71+
72+
Update user
73+
74+
75+
```shell
76+
test_db=# UPDATE users SET email = '[email protected]' WHERE id = 6;
77+
78+
# Check contents of the Elasticsearch database:
79+
curl http://localhost:9200/users/_search?q=id:6
80+
```
81+
82+
Delete user
83+
84+
```shell
85+
test_db=# DELETE FROM users WHERE id = 6;
86+
87+
# Check contents of the Elasticsearch database:
88+
curl http://localhost:9200/users/_search?q=id:6
89+
```

debezium-jdbc-es/Dockerfile

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
FROM debezium/connect:0.9
2+
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc \
3+
KAFKA_CONNECT_ES_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-elasticsearch
4+
5+
6+
# Deploy PostgreSQL JDBC Driver
7+
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-42.1.4.jar
8+
9+
# Deploy Kafka Connect JDBC
10+
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
11+
curl -sO http://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/5.0.0/kafka-connect-jdbc-5.0.0.jar
12+
13+
# Deploy Confluent Elasticsearch sink connector
14+
RUN mkdir $KAFKA_CONNECT_ES_DIR && cd $KAFKA_CONNECT_ES_DIR &&\
15+
curl -sO http://packages.confluent.io/maven/io/confluent/kafka-connect-elasticsearch/5.0.0/kafka-connect-elasticsearch-5.0.0.jar && \
16+
curl -sO http://central.maven.org/maven2/io/searchbox/jest/2.0.0/jest-2.0.0.jar && \
17+
curl -sO http://central.maven.org/maven2/org/apache/httpcomponents/httpcore-nio/4.4.4/httpcore-nio-4.4.4.jar && \
18+
curl -sO http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.1/httpclient-4.5.1.jar && \
19+
curl -sO http://central.maven.org/maven2/org/apache/httpcomponents/httpasyncclient/4.1.1/httpasyncclient-4.1.1.jar && \
20+
curl -sO http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar && \
21+
curl -sO http://central.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar && \
22+
curl -sO http://central.maven.org/maven2/commons-codec/commons-codec/1.9/commons-codec-1.9.jar && \
23+
curl -sO http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar && \
24+
curl -sO http://central.maven.org/maven2/io/searchbox/jest-common/2.0.0/jest-common-2.0.0.jar && \
25+
curl -sO http://central.maven.org/maven2/com/google/code/gson/gson/2.4/gson-2.4.jar

docker-compose.yaml

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
version: "3"
2+
services:
3+
zookeeper:
4+
image: debezium/zookeeper:0.9
5+
restart: always
6+
ports:
7+
- 2181:2181
8+
- 2888:2888
9+
- 3888:3888
10+
kafka:
11+
image: debezium/kafka:0.9
12+
restart: always
13+
ports:
14+
- 9092:9092
15+
links:
16+
- zookeeper
17+
environment:
18+
- ZOOKEEPER_CONNECT=zookeeper:2181
19+
postgres:
20+
build: ./postgres
21+
restart: always
22+
ports:
23+
- 5432:5432
24+
environment:
25+
- POSTGRES_USER=postgres
26+
- POSTGRES_PASSWORD=postgres
27+
- POSTGRES_DATABASE=test_db
28+
elasticsearch:
29+
image: docker.elastic.co/elasticsearch/elasticsearch:6.6.2
30+
restart: always
31+
ports:
32+
- 9200:9200
33+
environment:
34+
- http.host=0.0.0.0
35+
- transport.host=127.0.0.1
36+
- xpack.security.enabled=false
37+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
38+
connect:
39+
image: debezium/connect-jdbc-es:0.9
40+
build:
41+
context: debezium-jdbc-es
42+
restart: always
43+
ports:
44+
- 8083:8083
45+
- 5005:5005
46+
links:
47+
- kafka
48+
- postgres
49+
- elasticsearch
50+
environment:
51+
- BOOTSTRAP_SERVERS=kafka:9092
52+
- GROUP_ID=1
53+
- CONFIG_STORAGE_TOPIC=my_connect_configs
54+
- OFFSET_STORAGE_TOPIC=my_connect_offsets

postgres/Dockerfile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FROM debezium/postgres:9.6
2+
3+
COPY ./setup.sql /home/setup.sql
4+
COPY ./fake.sql /home/fake.sql
5+
COPY ./init-db.sh /docker-entrypoint-initdb.d/init-db.sh

postgres/fake.sql

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
START TRANSACTION;
2+
3+
INSERT INTO users(email)
4+
SELECT
5+
'user_' || seq || '@' || (
6+
CASE (RANDOM() * 2)::INT
7+
WHEN 0 THEN 'gmail'
8+
WHEN 1 THEN 'hotmail'
9+
WHEN 2 THEN 'yahoo'
10+
END
11+
) || '.com' AS email
12+
FROM GENERATE_SERIES(1, 5) seq;
13+
14+
15+
INSERT INTO posts(user_id, title)
16+
WITH expanded AS (
17+
SELECT RANDOM(), seq, u.id AS user_id
18+
FROM GENERATE_SERIES(1, 25) seq, users u
19+
), shuffled AS (
20+
SELECT e.*
21+
FROM expanded e
22+
INNER JOIN (
23+
SELECT ei.seq, MIN(ei.random) FROM expanded ei GROUP BY ei.seq
24+
) em ON (e.seq = em.seq AND e.random = em.min)
25+
ORDER BY e.seq
26+
)
27+
SELECT
28+
s.user_id,
29+
'It is ' || s.seq || ' ' || (
30+
CASE (RANDOM() * 2)::INT
31+
WHEN 0 THEN 'sql'
32+
WHEN 1 THEN 'elixir'
33+
WHEN 2 THEN 'ruby'
34+
END
35+
) as title
36+
FROM shuffled s;
37+
38+
39+
INSERT INTO comments(user_id, post_id, body)
40+
WITH expanded AS (
41+
SELECT RANDOM(), seq, u.id AS user_id, p.id AS post_id
42+
FROM GENERATE_SERIES(1, 100) seq, users u, posts p
43+
), shuffled AS (
44+
SELECT e.*
45+
FROM expanded e
46+
INNER JOIN (
47+
SELECT ei.seq, MIN(ei.random) FROM expanded ei GROUP BY ei.seq
48+
) em ON (e.seq = em.seq AND e.random = em.min)
49+
ORDER BY e.seq
50+
)
51+
SELECT
52+
s.user_id,
53+
s.post_id,
54+
'Here some comment ' || s.seq AS body
55+
FROM shuffled s;
56+
57+
COMMIT;

postgres/init-db.sh

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/bin/bash
2+
set -e
3+
4+
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<-EOSQL
5+
CREATE DATABASE test_db;
6+
\c test_db;
7+
\i home/setup.sql;
8+
\i home/fake.sql;

postgres/setup.sql

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
START TRANSACTION;
2+
3+
CREATE TABLE users(
4+
id SERIAL PRIMARY KEY,
5+
email VARCHAR(40) NOT NULL UNIQUE
6+
);
7+
8+
CREATE TABLE posts(
9+
id SERIAL PRIMARY KEY,
10+
user_id INTEGER NOT NULL REFERENCES users(id),
11+
title VARCHAR(100) NOT NULL UNIQUE
12+
);
13+
14+
CREATE TABLE comments(
15+
id SERIAL PRIMARY KEY,
16+
user_id INTEGER NOT NULL REFERENCES users(id),
17+
post_id INTEGER NOT NULL REFERENCES posts(id),
18+
body VARCHAR(500) NOT NULL
19+
);
20+
21+
COMMIT;
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "es-sink-comments",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"tasks.max": "1",
6+
"topics": "comments",
7+
"connection.url": "http://elasticsearch:9200",
8+
"transforms": "unwrap,key",
9+
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
10+
"transforms.unwrap.drop.tombstones": "false",
11+
"transforms.unwrap.drop.deletes": "false",
12+
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
13+
"transforms.key.field": "id",
14+
"key.ignore": "false",
15+
"type.name": "_doc",
16+
"behavior.on.null.values": "delete"
17+
}
18+
}

reqs/connections/es-sink-posts.json

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "es-sink-posts",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"tasks.max": "1",
6+
"topics": "posts",
7+
"connection.url": "http://elasticsearch:9200",
8+
"transforms": "unwrap,key",
9+
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
10+
"transforms.unwrap.drop.tombstones": "false",
11+
"transforms.unwrap.drop.deletes": "false",
12+
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
13+
"transforms.key.field": "id",
14+
"key.ignore": "false",
15+
"type.name": "_doc",
16+
"behavior.on.null.values": "delete"
17+
}
18+
}

reqs/connections/es-sink-users.json

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "es-sink-users",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"tasks.max": "1",
6+
"topics": "users",
7+
"connection.url": "http://elasticsearch:9200",
8+
"transforms": "unwrap,key",
9+
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
10+
"transforms.unwrap.drop.tombstones": "false",
11+
"transforms.unwrap.drop.deletes": "false",
12+
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
13+
"transforms.key.field": "id",
14+
"key.ignore": "false",
15+
"type.name": "_doc",
16+
"behavior.on.null.values": "delete"
17+
}
18+
}

reqs/connections/source.json

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"name": "test_db-connector",
3+
"config": {
4+
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
5+
"tasks.max": "1",
6+
"database.hostname": "postgres",
7+
"database.port": "5432",
8+
"database.user": "postgres",
9+
"database.password": "postgres",
10+
"database.server.id": "184054",
11+
"database.dbname": "test_db",
12+
"database.server.name": "dbserver1",
13+
"database.whitelist": "test_db",
14+
"database.history.kafka.bootstrap.servers": "kafka:9092",
15+
"database.history.kafka.topic": "schema-changes.test_db",
16+
"transforms": "route",
17+
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
18+
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
19+
"transforms.route.replacement": "$3"
20+
}
21+
}

reqs/mappings/comments.json

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"settings": {
3+
"number_of_shards": 1
4+
},
5+
"mappings": {
6+
"_doc": {
7+
"properties": {
8+
"id": {
9+
"type": "integer"
10+
},
11+
"body": {
12+
"type": "text",
13+
"fields": {
14+
"keyword": {
15+
"type": "keyword",
16+
"ignore_above": 256
17+
}
18+
}
19+
},
20+
"post_id": {
21+
"type": "integer"
22+
},
23+
"user_id": {
24+
"type": "integer"
25+
}
26+
}
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)