|
| 1 | + |
| 2 | +1. **Run Zookeeper in Docker** |
| 3 | + |
| 4 | + In another Terminal window, run the following command: |
| 5 | + ```bash |
| 6 | + docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:3.0 |
| 7 | + ``` |
| 8 | + Check the output log to see that zookeeper is running. |
| 9 | + |
| 10 | +1. **Run Kafka in Docker** |
| 11 | + |
| 12 | + In another Terminal window, run the following command: |
| 13 | + ```bash |
| 14 | + docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:3.0 |
| 15 | + ``` |
| 16 | + Check the output log to see that Kafka is running. |
| 17 | + |
| 18 | + |
| 19 | +1. **Run Kafka Connect in Docker** |
| 20 | + |
| 21 | + In another Terminal window, run the following command: |
| 22 | + ```bash |
| 23 | + docker run -it --rm --name connect \ |
| 24 | + -p 8083:8083 \ |
| 25 | + -e GROUP_ID=1 \ |
| 26 | + -e CONFIG_STORAGE_TOPIC=accounts \ |
| 27 | + -e OFFSET_STORAGE_TOPIC=offsets \ |
| 28 | + -e STATUS_STORAGE_TOPIC=storage \ |
| 29 | + --link kafka:kafka \ |
| 30 | + --link timescaledb:timescaledb \ |
| 31 | + quay.io/debezium/connect:3.0 |
| 32 | + ``` |
| 33 | + Check the output log to see that Kafka Connect is running. |
| 34 | + |
| 35 | + |
| 36 | +1. **Register the Debezium PostgreSQL source connector** |
| 37 | + |
| 38 | + Update the `<properties>` for the `<debezium-user>` you created in your $SELF_LONG instance in the following command. |
| 39 | + Then run the command in another Terminal window: |
| 40 | + ```bash |
| 41 | + curl -X POST http://localhost:8083/connectors \ |
| 42 | + -H "Content-Type: application/json" \ |
| 43 | + -d '{ |
| 44 | + "name": "timescaledb-connector", |
| 45 | + "config": { |
| 46 | + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", |
| 47 | + "database.hostname": "timescaledb", |
| 48 | + "database.port": "5432", |
| 49 | + "database.user": "<debezium-user>", |
| 50 | + "database.password": "<debezium-password>", |
| 51 | + "database.dbname" : "postgres", |
| 52 | + "topic.prefix": "accounts", |
| 53 | + "plugin.name": "pgoutput", |
| 54 | + "schema.include.list": "public,_timescaledb_internal", |
| 55 | + "transforms": "timescaledb", |
| 56 | + "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", |
| 57 | + "transforms.timescaledb.database.hostname": "timescaledb", |
| 58 | + "transforms.timescaledb.database.port": "5432", |
| 59 | + "transforms.timescaledb.database.user": "<debezium-user>", |
| 60 | + "transforms.timescaledb.database.password": "<debezium-password>", |
| 61 | + "transforms.timescaledb.database.dbname": "postgres" |
| 62 | + } |
| 63 | + }' |
| 64 | + ``` |
| 65 | + |
| 66 | +1. **Verify `timescaledb-source-connector` is included in the connector list** |
| 67 | + |
| 68 | + 1. Check the tasks associated with `timescaledb-connector`: |
| 69 | + ```bash |
| 70 | + curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/timescaledb-connector |
| 71 | + ``` |
| 72 | + You see something like: |
| 73 | + ```bash |
| 74 | + {"name":"timescaledb-connector","config": |
| 75 | + { "connector.class":"io.debezium.connector.postgresql.PostgresConnector", |
| 76 | + "transforms.timescaledb.database.hostname":"timescaledb", |
| 77 | + "transforms.timescaledb.database.password":"debeziumpassword","database.user":"debezium", |
| 78 | + "database.dbname":"postgres","transforms.timescaledb.database.dbname":"postgres", |
| 79 | + "transforms.timescaledb.database.user":"debezium", |
| 80 | + "transforms.timescaledb.type":"io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", |
| 81 | + "transforms.timescaledb.database.port":"5432","transforms":"timescaledb", |
| 82 | + "schema.include.list":"public,_timescaledb_internal","database.port":"5432","plugin.name":"pgoutput", |
| 83 | + "topic.prefix":"accounts","database.hostname":"timescaledb","database.password":"debeziumpassword", |
| 84 | + "name":"timescaledb-connector"},"tasks":[{"connector":"timescaledb-connector","task":0}],"type":"source"} |
| 85 | + ``` |
| 86 | +
|
| 87 | +1. **Verify `timescaledb-connector` is running** |
| 88 | +
|
| 89 | + 1. Open the Terminal window running Kafka Connect. When the connector is active, you see something like the following: |
| 90 | +
|
| 91 | + ```bash |
| 92 | + 2025-04-30 10:40:15,168 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal._hyper_1_1_chunk' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 93 | + 2025-04-30 10:40:15,168 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 94 | + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming SignalProcessor started. Scheduling it every 5000ms [io.debezium.pipeline.signal.SignalProcessor] |
| 95 | + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming Creating thread debezium-postgresconnector-accounts-SignalProcessor [io.debezium.util.Threads] |
| 96 | + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] |
| 97 | + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Retrieved latest position from stored offset 'LSN{0/1FCE570}' [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] |
| 98 | + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/1FCE570}' [io.debezium.connector.postgresql.connection.WalPositionLocator] |
| 99 | + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] |
| 100 | + 2025-04-30 10:40:15,189 INFO Postgres|accounts|streaming Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1FCCFF0}, catalogXmin=884] [io.debezium.connector.postgresql.connection.PostgresConnection] |
| 101 | + 2025-04-30 10:40:15,189 INFO Postgres|accounts|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] |
| 102 | + 2025-04-30 10:40:15,204 INFO Postgres|accounts|streaming Requested thread factory for component PostgresConnector, id = accounts named = keep-alive [io.debezium.util.Threads] |
| 103 | + 2025-04-30 10:40:15,204 INFO Postgres|accounts|streaming Creating thread debezium-postgresconnector-accounts-keep-alive [io.debezium.util.Threads] |
| 104 | + 2025-04-30 10:40:15,216 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_policy_chunk_stats' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 105 | + 2025-04-30 10:40:15,216 INFO Postgres|accounts|streaming REPLICA IDENTITY for 'public.accounts' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 106 | + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat_history' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 107 | + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal._hyper_1_1_chunk' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 108 | + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] |
| 109 | + 2025-04-30 10:40:15,219 INFO Postgres|accounts|streaming Processing messages [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] |
| 110 | + ``` |
| 111 | +
|
| 112 | + 1. Watch the events in the accounts topic on your $SELF_LONG instance. |
| 113 | + |
| 114 | + In another Terminal instance, run the following command: |
| 115 | +
|
| 116 | + ```bash |
| 117 | + docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:3.0 watch-topic -a -k accounts |
| 118 | + ``` |
| 119 | +
|
| 120 | + You see the topics being streamed. For example: |
| 121 | +
|
| 122 | + ```bash |
| 123 | + status-task-timescaledb-connector-0 {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":31} |
| 124 | + status-topic-timescaledb.public.accounts:connector-timescaledb-connector {"topic":{"name":"timescaledb.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009337985}} |
| 125 | + status-topic-accounts._timescaledb_internal.bgw_job_stat:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338118}} |
| 126 | + status-topic-accounts._timescaledb_internal.bgw_job_stat:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338120}} |
| 127 | + status-topic-accounts._timescaledb_internal.bgw_job_stat_history:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat_history","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338243}} |
| 128 | + status-topic-accounts._timescaledb_internal.bgw_job_stat_history:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat_history","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338245}} |
| 129 | + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338250}} |
| 130 | + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} |
| 131 | + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} |
| 132 | + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} |
| 133 | + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} |
| 134 | + ["timescaledb-connector",{"server":"accounts"}] {"last_snapshot_record":true,"lsn":33351024,"txId":893,"ts_usec":1746009337290783,"snapshot":"INITIAL","snapshot_completed":true} |
| 135 | + status-connector-timescaledb-connector {"state":"UNASSIGNED","trace":null,"worker_id":"172.17.0.5:8083","generation":31} |
| 136 | + status-task-timescaledb-connector-0 {"state":"UNASSIGNED","trace":null,"worker_id":"172.17.0.5:8083","generation":31} |
| 137 | + status-connector-timescaledb-connector {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":33} |
| 138 | + status-task-timescaledb-connector-0 {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":33} |
| 139 | + ``` |
0 commit comments