From cc20f2ed3f7bf65d0aef0218f098c4a1edaebad2 Mon Sep 17 00:00:00 2001 From: atovpeko Date: Thu, 6 Feb 2025 14:48:09 +0200 Subject: [PATCH 01/18] draft --- use-timescale/integrations/debezium.md | 150 +++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 use-timescale/integrations/debezium.md diff --git a/use-timescale/integrations/debezium.md b/use-timescale/integrations/debezium.md new file mode 100644 index 0000000000..33f3b69cc0 --- /dev/null +++ b/use-timescale/integrations/debezium.md @@ -0,0 +1,150 @@ +--- +title: Integrate Debezium with Timescale Cloud +excerpt: Integrate Debezium with Timescale Cloud to enable change data capture for your PostgreSQL workloads +products: [cloud, mst, self_hosted] +keywords: [Debezium, integrate] +--- + +import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; + +# Integrate Debezium with $CLOUD_LONG + +[Debezium][debezium] is an open-source distributed platform for change data capture (CDC). +It enables you to capture changes Win your $SERVICE_LONG and stream them to other systems in real time. + +This pages explains how to integrate Debezium with $CLOUD_LONG and Kafka + +## Prerequisites + + + +- Install [Debezium][debezium-install] + +## Connect your $SERVICE_LONG + + + + + +To connect to $CLOUD_LONG: + + + +1. **Enable logical replication for your $SERVICE_LONG** + + 1. Connect to your $SERVICE_SHORT using your [connection details][connection-info]. + + 1. Run the following command to enable logical replication: + + ```sql + ALTER SYSTEM SET wal_level = 'logical'; + ALTER SYSTEM SET max_replication_slots = 10; + ALTER SYSTEM SET max_wal_senders = 10; + ``` + + 1. Restart your $SERVICE_SHORT. + +1. **Create a replication slot** + + ```sql + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` + +1. **Configure Debezium** + + Modify the Debezium connector configuration to point to $CLOUD_LONG using your [connection details][connection-info]: + + ```json + { + "name": "timescale-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "", + "database.port": "5432", + "database.user": "", + "database.password": "", + "database.dbname": "", + "database.server.name": "timescale-server", + } + } + ``` + +1. **Test the connection** + + Start the Debezium connector and ensure it connects to your $SERVICE_LONG successfully. + + + + + + + +To connect to your $SELF_LONG database: + + + +1. **Enable logical replication in $TIMESCALE_DB** + + 1. Modify the following settings in `postgresql.conf`. It is usually located in `/var/lib/postgresql/data/postgresql.conf` or `/etc/postgresql/*/main/postgresql.conf`: + + ``` + wal_level = logical + max_replication_slots = 10 + max_wal_senders = 10 + ``` + + 1. Restart PostgreSQL. + +1. **Create a replication slot** + + 1. Connect to your database using your [connection details][connection-info]. + 1. Run the following command: + + ```sql + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` + + 1. Grant replication privileges to the user Debezium will use: + + ```sql + ALTER ROLE WITH REPLICATION; + ``` + +1. **Configure Debezium** + + Create a Debezium connector configuration file `debezium-postgres.json`: + + ```json + { + "name": "timescale-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "", + "database.port": "5432", + "database.user": "", + "database.password": "", + "database.dbname": "", + "database.server.name": "timescale-server", + "plugin.name": "pgoutput", + "slot.name": "debezium_slot", + "publication.name": "debezium_publication" + } + } + ``` + +1. **Test the connection** + + Start the Debezium connector and ensure it connects to your database successfully. + + + + + + + +You have successfully integrated Debezium with $CLOUD_LONG. + +[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ +[debezium]: https://debezium.io/ +[debezium-install]: https://debezium.io/documentation/reference/stable/install.html +[console]: https://console.cloud.timescale.com/dashboard/services \ No newline at end of file From e2d4457b2d7943d9a704194e772a21e637d50edc Mon Sep 17 00:00:00 2001 From: atovpeko Date: Tue, 18 Feb 2025 10:30:27 +0200 Subject: [PATCH 02/18] update --- _partials/_integration-prereqs.md | 2 +- use-timescale/integrations/debezium.md | 243 ++++++++++++++++++------- use-timescale/integrations/index.md | 9 +- use-timescale/page-index/page-index.js | 7 +- 4 files changed, 192 insertions(+), 69 deletions(-) diff --git a/_partials/_integration-prereqs.md b/_partials/_integration-prereqs.md index 204bcaf87e..7d6b5b4e35 100644 --- a/_partials/_integration-prereqs.md +++ b/_partials/_integration-prereqs.md @@ -1,6 +1,6 @@ Before integrating: -* Create a target [$SERVICE_LONG][create-service] +* Create a target [$SERVICE_LONG][create-service]. You need [your connection details][connection-info] to follow the steps in this page. This procedure also works for [self-hosted $TIMESCALE_DB][enable-timescaledb]. diff --git a/use-timescale/integrations/debezium.md b/use-timescale/integrations/debezium.md index 33f3b69cc0..ce6aa7b40c 100644 --- a/use-timescale/integrations/debezium.md +++ b/use-timescale/integrations/debezium.md @@ -1,6 +1,6 @@ --- title: Integrate Debezium with Timescale Cloud -excerpt: Integrate Debezium with Timescale Cloud to enable change data capture for your PostgreSQL workloads +excerpt: Integrate Debezium with Timescale Cloud to enable change data capture in your Timescale Cloud service and streaming to Redis Streams products: [cloud, mst, self_hosted] keywords: [Debezium, integrate] --- @@ -10,21 +10,23 @@ import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.md # Integrate Debezium with $CLOUD_LONG [Debezium][debezium] is an open-source distributed platform for change data capture (CDC). -It enables you to capture changes Win your $SERVICE_LONG and stream them to other systems in real time. +It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. -This pages explains how to integrate Debezium with $CLOUD_LONG and Kafka +This pages explains how to capture changes in your $SERVICE_LONG and stream them to Redis Streams. ## Prerequisites -- Install [Debezium][debezium-install] +- Install [Debezium Server][debezium-install]. +- Install [Redis][redis-local] or sign up for [Redis Cloud][redis-cloud]. +- For a self-hosted installation, ensure that your PostgreSQL database is accessible from the Debezium Server instance. ## Connect your $SERVICE_LONG - + To connect to $CLOUD_LONG: @@ -32,109 +34,217 @@ To connect to $CLOUD_LONG: 1. **Enable logical replication for your $SERVICE_LONG** - 1. Connect to your $SERVICE_SHORT using your [connection details][connection-info]. + 1. [Connect][connect] to your $SERVICE_SHORT using your [connection details][connection-info]. 1. Run the following command to enable logical replication: ```sql - ALTER SYSTEM SET wal_level = 'logical'; - ALTER SYSTEM SET max_replication_slots = 10; - ALTER SYSTEM SET max_wal_senders = 10; + ALTER SYSTEM SET wal_level = logical; + SELECT pg_reload_conf(); ``` 1. Restart your $SERVICE_SHORT. -1. **Create a replication slot** +1. **Create a table** - ```sql - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); - ``` + Create a table to test the integration. For example: -1. **Configure Debezium** + ```sql + CREATE TABLE sensor_data ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + temperature FLOAT NOT NULL, + recorded_at TIMESTAMPTZ DEFAULT now() + ); + ``` - Modify the Debezium connector configuration to point to $CLOUD_LONG using your [connection details][connection-info]: +1. **Configure Debezium** - ```json - { - "name": "timescale-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "", - "database.port": "5432", - "database.user": "", - "database.password": "", - "database.dbname": "", - "database.server.name": "timescale-server", - } - } + 1. Navigate to the `conf` directory in your Debezium files. + + 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. + + ```properties + debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector + debezium.source.database.hostname= + debezium.source.database.port= + debezium.source.database.user= + debezium.source.database.password= + debezium.source.database.dbname= + debezium.source.plugin.name=pgoutput + debezium.source.slot.name=debezium_slot + debezium.source.publication.autocreate.mode=filtered + + # Define Redis Streams as the sink + debezium.sink.type=redis + debezium.sink.redis.address=localhost:6379 + debezium.sink.redis.stream.name=debezium_stream + debezium.format.value=json ``` +1. **Start Debezium Server** + + ```sh + bin/debezium-server-start.sh conf/application.properties + ``` + 1. **Test the connection** - Start the Debezium connector and ensure it connects to your $SERVICE_LONG successfully. + 1. Insert data into the table you created in $CLOUD_LONG: + + ```sql + INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); + ``` + + 1. Run the following command to check Redis Streams for incoming CDC events: + + ```sh + redis-cli XREAD STREAMS debezium_stream 0 + ``` + + You should see something like this: + + ```json + { + "op": "c", + "ts_ms": 1708000000000, + "source": { + "table": "sensor_data", + "db": "your_database", + "schema": "public" + }, + "after": { + "id": 1, + "device_id": "sensor-001", + "temperature": 22.5, + "recorded_at": "2024-02-15T12:00:00Z" + } + } + ``` + +You have successfully integrated Debezium with $CLOUD_LONG. - + -To connect to your $SELF_LONG database: +To connect to your database: -1. **Enable logical replication in $TIMESCALE_DB** +1. **Enable logical replication in PostgreSQL** - 1. Modify the following settings in `postgresql.conf`. It is usually located in `/var/lib/postgresql/data/postgresql.conf` or `/etc/postgresql/*/main/postgresql.conf`: + 1. Modify the following settings in `postgresql.conf`: - ``` + ```ini wal_level = logical max_replication_slots = 10 max_wal_senders = 10 ``` 1. Restart PostgreSQL. + + 1. Add the following to `pg_hba.conf` to allow replication connections: -1. **Create a replication slot** + ``` + host replication debezium 0.0.0.0/0 md5 + ``` - 1. Connect to your database using your [connection details][connection-info]. - 1. Run the following command: + 1. Restart PostgreSQL. - ```sql - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); +1. **Create a Debezium user in PostgreSQL** + + Create a user with the `LOGIN` and `REPLICATION` permissions: + + ```sql + CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'your_password'; + ``` + +1. **Create a table** + + Debezium will stream changes from this table to Redis. For example: + + ```sql + CREATE TABLE sensor_data ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + temperature FLOAT NOT NULL, + recorded_at TIMESTAMPTZ DEFAULT now() + ); + ``` + +1. **Create a replication slot and a publication** + + ```sql + CREATE PUBLICATION debezium_pub FOR TABLE sensor_data; + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` +1. **Configure Debezium** + + 1. Navigate to the `conf` directory in your Debezium files. + + 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. + + ```properties + debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector + debezium.source.database.hostname= + debezium.source.database.port= + debezium.source.database.user= + debezium.source.database.password= + debezium.source.database.dbname= + debezium.source.plugin.name=pgoutput + debezium.source.slot.name=debezium_slot + + # Define Redis Streams as the sink + debezium.sink.type=redis + debezium.sink.redis.address=localhost:6379 + debezium.sink.redis.stream.name=debezium_stream + debezium.format.value=json ``` - - 1. Grant replication privileges to the user Debezium will use: + +1. **Start Debezium Server** + + ```sh + bin/debezium-server-start.sh conf/application.properties + ``` + +1. **Test the connection** + + 1. Insert data into the table you created earlier: ```sql - ALTER ROLE WITH REPLICATION; + INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); ``` -1. **Configure Debezium** + 1. Run the following command to check Redis Streams for incoming CDC events: + + ```sh + redis-cli XREAD STREAMS debezium_stream 0 + ``` - Create a Debezium connector configuration file `debezium-postgres.json`: + You should see something like this: ```json - { - "name": "timescale-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "", - "database.port": "5432", - "database.user": "", - "database.password": "", - "database.dbname": "", - "database.server.name": "timescale-server", - "plugin.name": "pgoutput", - "slot.name": "debezium_slot", - "publication.name": "debezium_publication" - } - } + { + "op": "c", + "ts_ms": 1708000000000, + "source": { + "table": "sensor_data", + "db": "your_database", + "schema": "public" + }, + "after": { + "id": 1, + "device_id": "sensor-001", + "temperature": 22.5, + "recorded_at": "2024-02-15T12:00:00Z" + } + } ``` -1. **Test the connection** - - Start the Debezium connector and ensure it connects to your database successfully. +You have successfully integrated Debezium with self-hosted $TIMESCALE_DB. @@ -142,9 +252,10 @@ To connect to your $SELF_LONG database: -You have successfully integrated Debezium with $CLOUD_LONG. - [connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ [debezium]: https://debezium.io/ -[debezium-install]: https://debezium.io/documentation/reference/stable/install.html -[console]: https://console.cloud.timescale.com/dashboard/services \ No newline at end of file +[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html +[console]: https://console.cloud.timescale.com/dashboard/services +[redis-local]: https://redis.io/docs/getting-started/ +[redis-cloud]: https://redis.com/try-free/ +[connect]: /getting-started/:currentVersion:/run-queries-from-console/ \ No newline at end of file diff --git a/use-timescale/integrations/index.md b/use-timescale/integrations/index.md index aeb3027b01..254dcf0b8e 100644 --- a/use-timescale/integrations/index.md +++ b/use-timescale/integrations/index.md @@ -53,6 +53,12 @@ Some of the most in-demand integrations for $CLOUD_LONG are listed below, with l | [Apache Airflow][apache-airflow] | Programmatically author, schedule, and monitor workflows. | | [AWS Lambda][aws-lambda]| Run code without provisioning or managing servers, scaling automatically as needed. | +## Change data capture + +| Name | Description | +|:--------------------:|---------------------------------------------------------------------------------| +| [Debezium][debezium] | Capture changes in your database and stream them to other systems in real time. | + [psql]: /use-timescale/:currentVersion:/integrations/psql/ [qstudio]: /use-timescale/:currentVersion:/integrations/qstudio/ @@ -69,4 +75,5 @@ Some of the most in-demand integrations for $CLOUD_LONG are listed below, with l [postgresql-integrations]: https://slashdot.org/software/p/PostgreSQL/integrations/ [prometheus]: /use-timescale/:currentVersion:/integrations/prometheus [amazon-sagemaker]: /use-timescale/:currentVersion:/integrations/amazon-sagemaker -[postgresql]: /use-timescale/:currentVersion:/integrations/postgresql \ No newline at end of file +[postgresql]: /use-timescale/:currentVersion:/integrations/postgresql +[debezium]: /use-timescale/:currentVersion:/integrations/debezium \ No newline at end of file diff --git a/use-timescale/page-index/page-index.js b/use-timescale/page-index/page-index.js index 21e3991a98..9c4b2198e7 100644 --- a/use-timescale/page-index/page-index.js +++ b/use-timescale/page-index/page-index.js @@ -813,6 +813,11 @@ module.exports = [ href: "dbeaver", excerpt: "Integrate DBeaver with Timescale products", }, + { + title: "Debezium", + href: "debezium", + excerpt: "Integrate Debezium with Timescale products", + }, { title: "Grafana", href: "grafana", @@ -826,7 +831,7 @@ module.exports = [ { title: "PostgreSQL", href: "postgresql", - excerpt: "Integrate PostgreSQL with Timescale Cloud", + excerpt: "Integrate PostgreSQL with Timescale products", }, { title: "Prometheus", From 6ec3dc867fd20f742ac1d104ab90d4ab3e24cbbc Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 16 Apr 2025 18:58:00 +0200 Subject: [PATCH 03/18] chore: more updates. --- .../_integration-apache-kafka-install.md | 58 ++++ ...tegration-debezium-cloud-config-service.md | 29 ++ ...on-debezium-self-hosted-config-database.md | 82 +++++ integrations/apache-kafka.md | 58 +--- integrations/debezium-server.md | 264 ++++++++++++++++ integrations/debezium.md | 292 ++++++------------ 6 files changed, 537 insertions(+), 246 deletions(-) create mode 100644 _partials/_integration-apache-kafka-install.md create mode 100644 _partials/_integration-debezium-cloud-config-service.md create mode 100644 _partials/_integration-debezium-self-hosted-config-database.md create mode 100644 integrations/debezium-server.md diff --git a/_partials/_integration-apache-kafka-install.md b/_partials/_integration-apache-kafka-install.md new file mode 100644 index 0000000000..b77be84072 --- /dev/null +++ b/_partials/_integration-apache-kafka-install.md @@ -0,0 +1,58 @@ +1. **Extract the Kafka binaries to a local folder** + + ```bash + curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf - + cd kafka_2.13-3.9.0 + ``` + From now on, the folder where you extracted the Kafka binaries is called ``. + +1. **Configure and run Apache Kafka** + + ```bash + KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" + ./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties + ./bin/kafka-server-start.sh config/kraft/reconfig-server.properties + ``` + Use the `-daemon` flag to run this process in the background. + +1. **Create Kafka topics** + + In another Terminal window, navigate to , then call `kafka-topics.sh` and create the following topics: + - `accounts`: publishes JSON messages that are consumed by the timescale-sink connector and inserted into your $SERVICE_LONG. + - `deadletter`: stores messages that cause errors and that Kafka Connect workers cannot process. + + ```bash + ./bin/kafka-topics.sh \ + --create \ + --topic accounts \ + --bootstrap-server localhost:9092 \ + --partitions 10 + + ./bin/kafka-topics.sh \ + --create \ + --topic deadletter \ + --bootstrap-server localhost:9092 \ + --partitions 10 + ``` + +1. **Test that your topics are working correctly** + 1. Run `kafka-console-producer` to send messages to the `accounts` topic: + ```bash + bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092 + ``` + 1. Send some events. For example, type the following: + ```bash + >Timescale Cloud + >How Cool + ``` + 1. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: + ```bash + bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 + ``` + You see + ```bash + Timescale Cloud + How Cool + ``` + + diff --git a/_partials/_integration-debezium-cloud-config-service.md b/_partials/_integration-debezium-cloud-config-service.md new file mode 100644 index 0000000000..1baf8e9c47 --- /dev/null +++ b/_partials/_integration-debezium-cloud-config-service.md @@ -0,0 +1,29 @@ + + +1. **Connect to your $SERVICE_LONG** + + For $CLOUD_LONG, open an [SQL editor][run-queries] in [$CONSOLE][open-console]. For self-hosted, use [`psql`][psql]. + +1. **Enable logical replication for your $SERVICE_LONG** + + 1. Run the following command to enable logical replication: + + ```sql + ALTER SYSTEM SET wal_level = logical; + SELECT pg_reload_conf(); + ``` + + 1. Restart your $SERVICE_SHORT. + +1. **Create a table** + + Create a table to test the integration. For example: + + ```sql + CREATE TABLE sensor_data ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + temperature FLOAT NOT NULL, + recorded_at TIMESTAMPTZ DEFAULT now() + ); + ``` diff --git a/_partials/_integration-debezium-self-hosted-config-database.md b/_partials/_integration-debezium-self-hosted-config-database.md new file mode 100644 index 0000000000..c0ae59ecd0 --- /dev/null +++ b/_partials/_integration-debezium-self-hosted-config-database.md @@ -0,0 +1,82 @@ + +1. **Configure your self-hosted $PG deployment** + + 1. Open `postgresql.conf`. + + The $PG configuration files are usually located in: + + - Docker: `/home/postgres/pgdata/data/` + - Linux: `/etc/postgresql//main/` or `/var/lib/pgsql//data/` + - MacOS: `/opt/homebrew/var/postgresql@/` + - Windows: `C:\Program Files\PostgreSQL\\data\` + + 1. Enable logical replication + + Modify the following settings in `postgresql.conf`: + + ```ini + wal_level = logical + max_replication_slots = 10 + max_wal_senders = 10 + ``` + + 1. Open `pg_hba.conf` and enable host replication: + + To allow replication connections, add the following: + + ``` + local replication debezium trust + ``` + This permission is for the `debezium` $PG user running on a local or Docker deployment. For more about replication + permissions, see [Configuring PostgreSQL to allow replication with the Debezium connector host][debezium-replication-permissions]. + + 1. Restart $PG. + + + +1. **Connect to your $SELF_LONG instance** + + Use [`psql`][psql-connect]. + +1. **Create a Debezium user in PostgreSQL** + + Create a user with the `LOGIN` and `REPLICATION` permissions: + + ```sql + CREATE ROLE WITH LOGIN CREATE REPLICATION PASSWORD ; + ``` + +1. **Enable a replication spot for Debezium** + + 1. Create a table for Debezium to listen to + + ```sql + CREATE TABLE accounts (created_at TIMESTAMPTZ DEFAULT NOW(), + name TEXT, + city TEXT); + ``` + + 1. **Turn the table into a hypertable** + + ```sql + SELECT create_hypertable('accounts', 'created_at'); + ``` + + Debezium also works with [$CAGGs][caggs]. + + 1. Create a publication and enable a replication slot + + ```sql + CREATE PUBLICATION debezium_pub FOR TABLE ACCOUNTS WITH (publish = 'insert, update'); + ``` + ```sql + IAIN, don't think we need this' + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'accounts'); + ``` + The replication slot must match the kafka topics you are streaming to. + +[caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ +[run-queries]: /getting-started/:currentVersion:/run-queries-from-console/ +[open-console]: https://console.cloud.timescale.com/dashboard/services +[psql-connect]: /integrations/:currentVersion:/psql/#connect-to-your-service +[debezium-replication-permissions]: https://debezium.io/documentation/reference/3.1/connectors/postgresql.html#postgresql-host-replication-permissions diff --git a/integrations/apache-kafka.md b/integrations/apache-kafka.md index 014a8f344c..da8500cc40 100644 --- a/integrations/apache-kafka.md +++ b/integrations/apache-kafka.md @@ -6,6 +6,7 @@ keywords: [Apache Kafka, integrations] --- import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; +import IntegrationApacheKafka from "versionContent/_partials/_integration-apache-kafka-install.mdx"; # Integrate Apache Kafka with $CLOUD_LONG @@ -29,62 +30,7 @@ To install and configure Apache Kafka: -1. **Extract the Kafka binaries to a local folder** - - ```bash - curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf - - cd kafka_2.13-3.9.0 - ``` - From now on, the folder where you extracted the Kafka binaries is called ``. - -1. **Configure and run Apache Kafka** - - ```bash - KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" - ./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties - ./bin/kafka-server-start.sh config/kraft/reconfig-server.properties - ``` - Use the `-daemon` flag to run this process in the background. - -1. **Create Kafka topics** - - In another Terminal window, navigate to , then call `kafka-topics.sh` and create the following topics: - - `accounts`: publishes JSON messages that are consumed by the timescale-sink connector and inserted into your $SERVICE_LONG. - - `deadletter`: stores messages that cause errors and that Kafka Connect workers cannot process. - - ```bash - ./bin/kafka-topics.sh \ - --create \ - --topic accounts \ - --bootstrap-server localhost:9092 \ - --partitions 10 - - ./bin/kafka-topics.sh \ - --create \ - --topic deadletter \ - --bootstrap-server localhost:9092 \ - --partitions 10 - ``` - -1. **Test that your topics are working correctly** - 1. Run `kafka-console-producer` to send messages to the `accounts` topic: - ```bash - bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092 - ``` - 1. Send some events. For example, type the following: - ```bash - >Timescale Cloud - >How Cool - ``` - 2. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: - ```bash - bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 - ``` - You see - ```bash - Timescale Cloud - How Cool - ``` + diff --git a/integrations/debezium-server.md b/integrations/debezium-server.md new file mode 100644 index 0000000000..ee0473e6b2 --- /dev/null +++ b/integrations/debezium-server.md @@ -0,0 +1,264 @@ +--- +title: Integrate Debezium with Timescale Cloud +excerpt: Integrate Debezium with Timescale Cloud to enable change data capture in your Timescale Cloud service and streaming to Redis Streams +products: [cloud, mst, self_hosted] +keywords: [Debezium, integrate] +--- + +import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; + +# Integrate Debezium with $CLOUD_LONG + +[Debezium][debezium] is an open-source distributed platform for change data capture (CDC). +It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. + +This page explains how to capture changes in your $SERVICE_LONG and stream them using Debezium. + +## Prerequisites + + + +- [Java8 or higher][java-installers] to run Debezium Server. +- Install [Debezium Server][debezium-install]. +- +- Install [Redis][redis-local] or sign up for [Redis Cloud][redis-cloud]. +- For a self-hosted installation, ensure that your PostgreSQL database is accessible from the Debezium Server instance. + +## Connect your $SERVICE_LONG + + + + + +To connect to $CLOUD_LONG: + + + +1. **Enable logical replication for your $SERVICE_LONG** + + 1. [Connect][connect] to your $SERVICE_SHORT using your [connection details][connection-info]. + + 1. Run the following command to enable logical replication: + + ```sql + ALTER SYSTEM SET wal_level = logical; + SELECT pg_reload_conf(); + ``` + + 1. Restart your $SERVICE_SHORT. + +1. **Create a table** + + Create a table to test the integration. For example: + + ```sql + CREATE TABLE sensor_data ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + temperature FLOAT NOT NULL, + recorded_at TIMESTAMPTZ DEFAULT now() + ); + ``` + +1. **Configure Debezium** + + 1. Navigate to the `conf` directory in your Debezium files. + + 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. + + ```properties + debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector + debezium.source.database.hostname= + debezium.source.database.port= + debezium.source.database.user= + debezium.source.database.password= + debezium.source.database.dbname= + debezium.source.plugin.name=pgoutput + debezium.source.slot.name=debezium_slot + debezium.source.publication.autocreate.mode=filtered + + # Define Redis Streams as the sink + debezium.sink.type=redis + debezium.sink.redis.address=localhost:6379 + debezium.sink.redis.stream.name=debezium_stream + debezium.format.value=json + ``` + +1. **Start Debezium Server** + + ```sh + bin/debezium-server-start.sh conf/application.properties + ``` + +1. **Test the connection** + + 1. Insert data into the table you created in $CLOUD_LONG: + + ```sql + INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); + ``` + + 1. Run the following command to check Redis Streams for incoming CDC events: + + ```sh + redis-cli XREAD STREAMS debezium_stream 0 + ``` + + You should see something like this: + + ```json + { + "op": "c", + "ts_ms": 1708000000000, + "source": { + "table": "sensor_data", + "db": "your_database", + "schema": "public" + }, + "after": { + "id": 1, + "device_id": "sensor-001", + "temperature": 22.5, + "recorded_at": "2024-02-15T12:00:00Z" + } + } + ``` + +You have successfully integrated Debezium with $CLOUD_LONG. + + + + + + + +To connect to your database: + + + +1. **Enable logical replication in PostgreSQL** + + 1. Modify the following settings in `postgresql.conf`: + + ```ini + wal_level = logical + max_replication_slots = 10 + max_wal_senders = 10 + ``` + + 1. Restart PostgreSQL. + + 1. Add the following to `pg_hba.conf` to allow replication connections: + + ``` + host replication debezium 0.0.0.0/0 md5 + ``` + + 1. Restart PostgreSQL. + +1. **Create a Debezium user in PostgreSQL** + + Create a user with the `LOGIN` and `REPLICATION` permissions: + + ```sql + CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'your_password'; + ``` + +1. **Create a table** + + Debezium will stream changes from this table to Redis. For example: + + ```sql + CREATE TABLE sensor_data ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + temperature FLOAT NOT NULL, + recorded_at TIMESTAMPTZ DEFAULT now() + ); + ``` + +1. **Create a replication slot and a publication** + + ```sql + CREATE PUBLICATION debezium_pub FOR TABLE sensor_data; + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` +1. **Configure Debezium** + + 1. Navigate to the `conf` directory in your Debezium files. + + 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. + + ```properties + debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector + debezium.source.database.hostname= + debezium.source.database.port= + debezium.source.database.user= + debezium.source.database.password= + debezium.source.database.dbname= + debezium.source.plugin.name=pgoutput + debezium.source.slot.name=debezium_slot + + # Define Redis Streams as the sink + debezium.sink.type=redis + debezium.sink.redis.address=localhost:6379 + debezium.sink.redis.stream.name=debezium_stream + debezium.format.value=json + ``` + +1. **Start Debezium Server** + + ```sh + bin/debezium-server-start.sh conf/application.properties + ``` + +1. **Test the connection** + + 1. Insert data into the table you created earlier: + + ```sql + INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); + ``` + + 1. Run the following command to check Redis Streams for incoming CDC events: + + ```sh + redis-cli XREAD STREAMS debezium_stream 0 + ``` + + You should see something like this: + + ```json + { + "op": "c", + "ts_ms": 1708000000000, + "source": { + "table": "sensor_data", + "db": "your_database", + "schema": "public" + }, + "after": { + "id": 1, + "device_id": "sensor-001", + "temperature": 22.5, + "recorded_at": "2024-02-15T12:00:00Z" + } + } + ``` + +You have successfully integrated Debezium with self-hosted $TIMESCALE_DB. + + + + + + + +[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ +[debezium]: https://debezium.io/ +[java-installers]: https://www.oracle.com/java/technologies/downloads/ +[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_installation +[console]: https://console.cloud.timescale.com/dashboard/services +[redis-local]: https://redis.io/docs/getting-started/ +[redis-cloud]: https://redis.com/try-free/ +[connect]: /getting-started/:currentVersion:/run-queries-from-console/ diff --git a/integrations/debezium.md b/integrations/debezium.md index ce6aa7b40c..7b25f2772d 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -6,256 +6,168 @@ keywords: [Debezium, integrate] --- import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; +import IntegrationApacheKafka from "versionContent/_partials/_integration-apache-kafka-install.mdx"; +import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integration-debezium-self-hosted-config-database.mdx"; # Integrate Debezium with $CLOUD_LONG [Debezium][debezium] is an open-source distributed platform for change data capture (CDC). It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. -This pages explains how to capture changes in your $SERVICE_LONG and stream them to Redis Streams. + -## Prerequisites - - - -- Install [Debezium Server][debezium-install]. -- Install [Redis][redis-local] or sign up for [Redis Cloud][redis-cloud]. -- For a self-hosted installation, ensure that your PostgreSQL database is accessible from the Debezium Server instance. - -## Connect your $SERVICE_LONG - - +If you enable hypercore, the Debezium $TIMESCALE_DB connector does not apply any special processing to data in the +columnstore. Compressed chunks are forwarded unchanged to the next downstream job in the pipeline for further processing +as needed. Typically, messages with compressed chunks are dropped, and are not processed by subsequent jobs in the pipeline. - - -To connect to $CLOUD_LONG: - - +This limitation only affects changes to chunks in the columnstore. Changes to data in the rowstore work correctly. -1. **Enable logical replication for your $SERVICE_LONG** + - 1. [Connect][connect] to your $SERVICE_SHORT using your [connection details][connection-info]. +This page explains how to capture changes in your $SERVICE_LONG and stream them using Debezium +on Apache Kafka. - 1. Run the following command to enable logical replication: - - ```sql - ALTER SYSTEM SET wal_level = logical; - SELECT pg_reload_conf(); - ``` - - 1. Restart your $SERVICE_SHORT. +## Prerequisites -1. **Create a table** + - Create a table to test the integration. For example: +- [Java8 or higher][java-installers] to run Apache Kafka. - ```sql - CREATE TABLE sensor_data ( - id SERIAL PRIMARY KEY, - device_id TEXT NOT NULL, - temperature FLOAT NOT NULL, - recorded_at TIMESTAMPTZ DEFAULT now() - ); - ``` +## Configure your database to work with Debezium -1. **Configure Debezium** + - 1. Navigate to the `conf` directory in your Debezium files. + - 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. +To setup $SELF_LONG_LC to communicate with Debezium: - ```properties - debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector - debezium.source.database.hostname= - debezium.source.database.port= - debezium.source.database.user= - debezium.source.database.password= - debezium.source.database.dbname= - debezium.source.plugin.name=pgoutput - debezium.source.slot.name=debezium_slot - debezium.source.publication.autocreate.mode=filtered + - # Define Redis Streams as the sink - debezium.sink.type=redis - debezium.sink.redis.address=localhost:6379 - debezium.sink.redis.stream.name=debezium_stream - debezium.format.value=json - ``` + -1. **Start Debezium Server** + - ```sh - bin/debezium-server-start.sh conf/application.properties - ``` + -1. **Test the connection** + - 1. Insert data into the table you created in $CLOUD_LONG: +Debezium requires logical replication to be enabled. Currently, this is not enabled by default on $SERVICE_LONGs. +We are working on enabling this feature as you read. As soon as it is live, these docs will be updated. - ```sql - INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); - ``` + - 1. Run the following command to check Redis Streams for incoming CDC events: + - ```sh - redis-cli XREAD STREAMS debezium_stream 0 - ``` +## Install and configure Apache Kafka - You should see something like this: +To install and configure Apache Kafka: - ```json - { - "op": "c", - "ts_ms": 1708000000000, - "source": { - "table": "sensor_data", - "db": "your_database", - "schema": "public" - }, - "after": { - "id": 1, - "device_id": "sensor-001", - "temperature": 22.5, - "recorded_at": "2024-02-15T12:00:00Z" - } - } - ``` + -You have successfully integrated Debezium with $CLOUD_LONG. + - +Keep these terminals open, you use them to test the integration later. - +## Configure Debezium to listen to your database -To connect to your database: +Set up Kafka Connect server, plugins, drivers, and connectors: -1. **Enable logical replication in PostgreSQL** - - 1. Modify the following settings in `postgresql.conf`: - - ```ini - wal_level = logical - max_replication_slots = 10 - max_wal_senders = 10 - ``` - - 1. Restart PostgreSQL. - - 1. Add the following to `pg_hba.conf` to allow replication connections: - - ``` - host replication debezium 0.0.0.0/0 md5 - ``` - - 1. Restart PostgreSQL. +1. **Install the Debezium connector** -1. **Create a Debezium user in PostgreSQL** - - Create a user with the `LOGIN` and `REPLICATION` permissions: + In another Terminal window, navigate to , then download and configure the PostgreSQL sink and driver. + ```bash + mkdir -p "plugins/debezium-connector-postgres" + curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.1.0.Final/debezium-connector-postgres-3.1.0.Final-plugin.tar.gz \ + | tar -xzf - -C "plugins/debezium-connector-postgres" --strip-components=1 + echo "plugin.path=`pwd`/plugins/debezium-connector-postgres" >> "config/connect-distributed.properties" + echo "plugin.path=`pwd`/plugins/debezium-connector-postgres" >> "config/connect-standalone.properties" + ``` - ```sql - CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'your_password'; +1. **Configure Debezium to poll your database** + + Write the following configuration to `/config/timescale-debezium-sink.properties`, then update the + `` with your [connection details][connection-info]. + + ```properties + name=timescale-debezium-sink + connector.class=io.debezium.connector.postgresql.PostgresConnector + database.hostname= + database.port= + database.user= + database.password= + database.dbname= + topic.prefix=accounts + plugin.name=pgoutput + schema.include.list=_timescaledb_internal + transforms=timescaledb + transforms.timescaledb.type=io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb + transforms.timescaledb.database.hostname= + transforms.timescaledb.database.port= + transforms.timescaledb.database.user= + transforms.timescaledb.database.password= + transforms.timescaledb.database.dbname= ``` -1. **Create a table** + - The values for the `*.hostname`, `*.port`, `*.user`, `*.password`, and `*.dbname` properties must match. You + created `` in [Configure your database to work with Debezium][debezium-configure-database] + - `topic.prefix` is the name of the kafka topic you created in [Install and configure Apache Kafka][kafka-install-configure]. - Debezium will stream changes from this table to Redis. For example: +1. **Start Kafka Connect** - ```sql - CREATE TABLE sensor_data ( - id SERIAL PRIMARY KEY, - device_id TEXT NOT NULL, - temperature FLOAT NOT NULL, - recorded_at TIMESTAMPTZ DEFAULT now() - ); - ``` + ```bash + export CLASSPATH=`pwd`/plugins/debezium-connector-postgres/* + ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-debezium-sink.properties + ``` -1. **Create a replication slot and a publication** + Use the `-daemon` flag to run this process in the background. - ```sql - CREATE PUBLICATION debezium_pub FOR TABLE sensor_data; - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); - ``` -1. **Configure Debezium** - - 1. Navigate to the `conf` directory in your Debezium files. - - 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. - - ```properties - debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector - debezium.source.database.hostname= - debezium.source.database.port= - debezium.source.database.user= - debezium.source.database.password= - debezium.source.database.dbname= - debezium.source.plugin.name=pgoutput - debezium.source.slot.name=debezium_slot - - # Define Redis Streams as the sink - debezium.sink.type=redis - debezium.sink.redis.address=localhost:6379 - debezium.sink.redis.stream.name=debezium_stream - debezium.format.value=json - ``` +1. **Verify Kafka Connect is running** -1. **Start Debezium Server** + In yet another another Terminal window, run the following command: - ```sh - bin/debezium-server-start.sh conf/application.properties - ``` + ```bash + curl http://localhost:8083 + ``` + You see something like: + ```bash + {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"} + ``` 1. **Test the connection** - 1. Insert data into the table you created earlier: - - ```sql - INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); - ``` + 1. Connect to your $SELF_LONG instance. - 1. Run the following command to check Redis Streams for incoming CDC events: + Use [`psql`][psql-connect]. + 1. Insert data into the table you created: - ```sh - redis-cli XREAD STREAMS debezium_stream 0 + ```sql + INSERT INTO accounts (name,city) VALUES ('Lola','Copacabana'); ``` - You should see something like this: - - ```json - { - "op": "c", - "ts_ms": 1708000000000, - "source": { - "table": "sensor_data", - "db": "your_database", - "schema": "public" - }, - "after": { - "id": 1, - "device_id": "sensor-001", - "temperature": 22.5, - "recorded_at": "2024-02-15T12:00:00Z" - } - } + 2. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: + ```bash + bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 ``` - -You have successfully integrated Debezium with self-hosted $TIMESCALE_DB. + You see + ```bash + Have to get this working + ``` - - - +You have successfully integrated Debezium. -[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ +[connection-info]: /integrations/:currentVersion:/find-connection-details/ [debezium]: https://debezium.io/ -[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html +[java-installers]: https://www.oracle.com/java/technologies/downloads/ +[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_installation [console]: https://console.cloud.timescale.com/dashboard/services [redis-local]: https://redis.io/docs/getting-started/ [redis-cloud]: https://redis.com/try-free/ -[connect]: /getting-started/:currentVersion:/run-queries-from-console/ \ No newline at end of file +[connect]: /getting-started/:currentVersion:/run-queries-from-console/ +[kafka-install-configure]: /integrations/:currentVersion:/debezium#install-and-configure-apache-kafka +[debezium-configure-database]: /integrations/:currentVersion:/debezium##configure-your-database-to-work-with-debezium From a6266bfbf8ddd4d0db6359c11334e2599fb46e5e Mon Sep 17 00:00:00 2001 From: Iain Date: Thu, 17 Apr 2025 12:07:05 +0200 Subject: [PATCH 04/18] chore: more updates. --- ...on-debezium-self-hosted-config-database.md | 19 ++++++++------- integrations/debezium.md | 24 ++++++++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/_partials/_integration-debezium-self-hosted-config-database.md b/_partials/_integration-debezium-self-hosted-config-database.md index c0ae59ecd0..21e0e14d23 100644 --- a/_partials/_integration-debezium-self-hosted-config-database.md +++ b/_partials/_integration-debezium-self-hosted-config-database.md @@ -43,7 +43,7 @@ Create a user with the `LOGIN` and `REPLICATION` permissions: ```sql - CREATE ROLE WITH LOGIN CREATE REPLICATION PASSWORD ; + CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD ; ``` 1. **Enable a replication spot for Debezium** @@ -56,7 +56,7 @@ city TEXT); ``` - 1. **Turn the table into a hypertable** + 1. Turn the table into a hypertable ```sql SELECT create_hypertable('accounts', 'created_at'); @@ -64,17 +64,18 @@ Debezium also works with [$CAGGs][caggs]. - 1. Create a publication and enable a replication slot - + 1. Make the + ```sql - CREATE PUBLICATION debezium_pub FOR TABLE ACCOUNTS WITH (publish = 'insert, update'); + ALTER TABLE accounts OWNER TO debezium; ``` + + 1. Create a publication and enable a replication slot + ```sql - IAIN, don't think we need this' - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'accounts'); + CREATE PUBLICATION dbz_publication FOR TABLE public.accounts WITH (publish = 'insert, update'); ``` - The replication slot must match the kafka topics you are streaming to. - + [caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ [run-queries]: /getting-started/:currentVersion:/run-queries-from-console/ [open-console]: https://console.cloud.timescale.com/dashboard/services diff --git a/integrations/debezium.md b/integrations/debezium.md index 7b25f2772d..348b59a407 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -14,18 +14,21 @@ import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integ [Debezium][debezium] is an open-source distributed platform for change data capture (CDC). It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. - +Debezium can capture events about: -If you enable hypercore, the Debezium $TIMESCALE_DB connector does not apply any special processing to data in the -columnstore. Compressed chunks are forwarded unchanged to the next downstream job in the pipeline for further processing -as needed. Typically, messages with compressed chunks are dropped, and are not processed by subsequent jobs in the pipeline. +- [Hypertables][hypertables]: captured events are rerouted from their chunk-specific topics to a single logical topic + named according to the following pattern: `..` +- [Continuous aggregates][caggs]: captured events are rerouted from their chunk-specific topics to a single logical topic + named according to the following pattern: `..` +- [Hypercore][hypercore]: If you enable hypercore, the Debezium $TIMESCALE_DB connector does not apply any special + processing to data in the columnstore. Compressed chunks are forwarded unchanged to the next downstream job in the + pipeline for further processing as needed. Typically, messages with compressed chunks are dropped, and are not + processed by subsequent jobs in the pipeline. -This limitation only affects changes to chunks in the columnstore. Changes to data in the rowstore work correctly. + This limitation only affects changes to chunks in the columnstore. Changes to data in the rowstore work correctly. - -This page explains how to capture changes in your $SERVICE_LONG and stream them using Debezium -on Apache Kafka. +This page explains how to capture changes in your database and stream them using Debezium on Apache Kafka. ## Prerequisites @@ -110,6 +113,7 @@ Set up Kafka Connect server, plugins, drivers, and connectors: transforms.timescaledb.database.user= transforms.timescaledb.database.password= transforms.timescaledb.database.dbname= + publication.autocreate.mode=filtered ``` - The values for the `*.hostname`, `*.port`, `*.user`, `*.password`, and `*.dbname` properties must match. You @@ -161,6 +165,9 @@ Set up Kafka Connect server, plugins, drivers, and connectors: You have successfully integrated Debezium. +[hypertables]: /use-timescale/:currentVersion:/hypertables/ +[hypercore]: /use-timescale/:currentVersion:/hypercore/ +[caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ [connection-info]: /integrations/:currentVersion:/find-connection-details/ [debezium]: https://debezium.io/ [java-installers]: https://www.oracle.com/java/technologies/downloads/ @@ -171,3 +178,4 @@ You have successfully integrated Debezium. [connect]: /getting-started/:currentVersion:/run-queries-from-console/ [kafka-install-configure]: /integrations/:currentVersion:/debezium#install-and-configure-apache-kafka [debezium-configure-database]: /integrations/:currentVersion:/debezium##configure-your-database-to-work-with-debezium +[psql-connect]: /integrations/:currentVersion:/psql/#connect-to-your-service From ede3998e6a7013753cfda59724e2ad43b20b1f89 Mon Sep 17 00:00:00 2001 From: Iain Date: Tue, 22 Apr 2025 11:25:25 +0200 Subject: [PATCH 05/18] chore: more updates. --- integrations/debezium.md | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 348b59a407..de4d2798e7 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -105,7 +105,6 @@ Set up Kafka Connect server, plugins, drivers, and connectors: database.dbname= topic.prefix=accounts plugin.name=pgoutput - schema.include.list=_timescaledb_internal transforms=timescaledb transforms.timescaledb.type=io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb transforms.timescaledb.database.hostname= From 6571bcc202ecf43a80acc6f13c123581811e9710 Mon Sep 17 00:00:00 2001 From: Iain Date: Tue, 22 Apr 2025 16:53:47 +0200 Subject: [PATCH 06/18] chore: more updates. --- .../_integration-debezium-self-hosted-config-database.md | 9 +-------- integrations/debezium.md | 3 ++- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/_partials/_integration-debezium-self-hosted-config-database.md b/_partials/_integration-debezium-self-hosted-config-database.md index 21e0e14d23..c9e35d28cc 100644 --- a/_partials/_integration-debezium-self-hosted-config-database.md +++ b/_partials/_integration-debezium-self-hosted-config-database.md @@ -33,7 +33,6 @@ 1. Restart $PG. - 1. **Connect to your $SELF_LONG instance** Use [`psql`][psql-connect]. @@ -64,16 +63,10 @@ Debezium also works with [$CAGGs][caggs]. - 1. Make the - - ```sql - ALTER TABLE accounts OWNER TO debezium; - ``` - 1. Create a publication and enable a replication slot ```sql - CREATE PUBLICATION dbz_publication FOR TABLE public.accounts WITH (publish = 'insert, update'); + CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update'); ``` [caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ diff --git a/integrations/debezium.md b/integrations/debezium.md index de4d2798e7..188bcb956e 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -112,7 +112,8 @@ Set up Kafka Connect server, plugins, drivers, and connectors: transforms.timescaledb.database.user= transforms.timescaledb.database.password= transforms.timescaledb.database.dbname= - publication.autocreate.mode=filtered + publication.autocreate.mode=all_tables + schema.include.list=public,_timescaledb_internal ``` - The values for the `*.hostname`, `*.port`, `*.user`, `*.password`, and `*.dbname` properties must match. You From 480ad769e1e5d6690f6c28cd271653af715741c3 Mon Sep 17 00:00:00 2001 From: Iain Date: Tue, 22 Apr 2025 17:31:27 +0200 Subject: [PATCH 07/18] chore: more updates. --- integrations/debezium.md | 54 ++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 188bcb956e..70f33280b3 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -141,25 +141,41 @@ Set up Kafka Connect server, plugins, drivers, and connectors: {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"} ``` -1. **Test the connection** - - 1. Connect to your $SELF_LONG instance. - - Use [`psql`][psql-connect]. - 1. Insert data into the table you created: - - ```sql - INSERT INTO accounts (name,city) VALUES ('Lola','Copacabana'); - ``` - - 2. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: - ```bash - bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 - ``` - You see - ```bash - Have to get this working - ``` + 1. **Test the connection** + + 1. Connect to your $SELF_LONG instance. + + Use [`psql`][psql-connect]. + 1. Insert data into the table you created: + + ```sql + INSERT INTO accounts (name,city) VALUES ('Lola','Copacabana'); + ``` + + 2. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: + ```bash + bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 + ``` + You see something like: + ```bash + { + "topic": "timescaledb.public.accounts", + "value": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"1970-01-01T00:00:00.000000Z\",\"field\":\"created_at\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"city\"}],\"optional\":true,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"1970-01-01T00:00:00.000000Z\",\"field\":\"created_at\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"city\"}],\"optional\":true,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"version\":1,\"field\":\"source\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"}],\"optional\":false,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Envelope\",\"version\":2},\"payload\":{\"before\":null,\"after\":{\"created_at\":\"2025-04-22T15:25:29.681517Z\",\"name\":\"sdfg\",\"city\":\"asdf\"},\"source\":{\"version\":\"3.1.0.Final\",\"connector\":\"postgresql\",\"name\":\"accounts\",\"ts_ms\":1745335529692,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"29187960\\\"]\",\"ts_us\":1745335529692108,\"ts_ns\":1745335529692108000,\"schema\":\"public\",\"table\":\"accounts\",\"txId\":769,\"lsn\":29187960,\"xmin\":null},\"transaction\":null,\"op\":\"c\",\"ts_ms\":1745335530172,\"ts_us\":1745335530172473,\"ts_ns\":1745335530172473672}}", + "headers": [ + { + "key": "__debezium_timescaledb_chunk_table", + "value": "_hyper_1_1_chunk" + }, + { + "key": "__debezium_timescaledb_chunk_schema", + "value": "_timescaledb_internal" + } + ], + "timestamp": 1745335530762, + "partition": 0, + "offset": 0 + } + ``` From f26c546c54b8416889799313fbea835f0caf7a24 Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 23 Apr 2025 17:55:57 +0200 Subject: [PATCH 08/18] chore: fix variable and make whole doc self-hosted only. --- integrations/debezium.md | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 70f33280b3..9108f0e198 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -42,7 +42,7 @@ This page explains how to capture changes in your database and stream them using -To setup $SELF_LONG_LC to communicate with Debezium: +To setup $SELF_LONG to communicate with Debezium: @@ -50,17 +50,6 @@ To setup $SELF_LONG_LC to communicate with Debezium: - - - - -Debezium requires logical replication to be enabled. Currently, this is not enabled by default on $SERVICE_LONGs. -We are working on enabling this feature as you read. As soon as it is live, these docs will be updated. - - - - - ## Install and configure Apache Kafka To install and configure Apache Kafka: @@ -181,6 +170,19 @@ Set up Kafka Connect server, plugins, drivers, and connectors: You have successfully integrated Debezium. + + + + + +Debezium requires logical replication to be enabled. Currently, this is not enabled by default on $SERVICE_LONGs. +We are working on enabling this feature as you read. As soon as it is live, these docs will be updated. + + + + + + [hypertables]: /use-timescale/:currentVersion:/hypertables/ [hypercore]: /use-timescale/:currentVersion:/hypercore/ [caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ From d7cfc66738c22ea78e123ed9ee02a843dba352d1 Mon Sep 17 00:00:00 2001 From: Iain Date: Thu, 24 Apr 2025 11:32:23 +0200 Subject: [PATCH 09/18] chore: small fix. --- _partials/_integration-debezium-self-hosted-config-database.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_partials/_integration-debezium-self-hosted-config-database.md b/_partials/_integration-debezium-self-hosted-config-database.md index c9e35d28cc..32e14875cd 100644 --- a/_partials/_integration-debezium-self-hosted-config-database.md +++ b/_partials/_integration-debezium-self-hosted-config-database.md @@ -42,7 +42,7 @@ Create a user with the `LOGIN` and `REPLICATION` permissions: ```sql - CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD ; + CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD ''; ``` 1. **Enable a replication spot for Debezium** From e2b5e551efb22813e3ddd147ab527cb5d82400c0 Mon Sep 17 00:00:00 2001 From: Iain Date: Tue, 29 Apr 2025 18:00:06 +0200 Subject: [PATCH 10/18] chore: update classpath --- integrations/debezium.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 9108f0e198..08e8521077 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -73,10 +73,10 @@ Set up Kafka Connect server, plugins, drivers, and connectors: In another Terminal window, navigate to , then download and configure the PostgreSQL sink and driver. ```bash mkdir -p "plugins/debezium-connector-postgres" - curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.1.0.Final/debezium-connector-postgres-3.1.0.Final-plugin.tar.gz \ + curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.1.1.Final/debezium-connector-postgres-3.1.1.Final-plugin.tar.gz \ | tar -xzf - -C "plugins/debezium-connector-postgres" --strip-components=1 - echo "plugin.path=`pwd`/plugins/debezium-connector-postgres" >> "config/connect-distributed.properties" - echo "plugin.path=`pwd`/plugins/debezium-connector-postgres" >> "config/connect-standalone.properties" + echo "plugin.path=`pwd`/libs,`pwd`/plugins/debezium-connector-postgres" >> "config/connect-distributed.properties" + echo "plugin.path=`pwd`/libs,`pwd`/plugins/debezium-connector-postgres" >> "config/connect-standalone.properties" ``` 1. **Configure Debezium to poll your database** @@ -112,7 +112,7 @@ Set up Kafka Connect server, plugins, drivers, and connectors: 1. **Start Kafka Connect** ```bash - export CLASSPATH=`pwd`/plugins/debezium-connector-postgres/* + export CLASSPATH=`pwd`/plugins/debezium-connector-postgres/*:`pwd`/libs/* ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-debezium-sink.properties ``` From ec7452ce81aaa9f18f7e15c1f531c68aef4f41c8 Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 30 Apr 2025 12:59:24 +0200 Subject: [PATCH 11/18] chore: update for docker. --- _partials/_integration-debezium-docker.md | 139 ++++++++++++++++++++++ integrations/debezium.md | 113 +----------------- 2 files changed, 142 insertions(+), 110 deletions(-) create mode 100644 _partials/_integration-debezium-docker.md diff --git a/_partials/_integration-debezium-docker.md b/_partials/_integration-debezium-docker.md new file mode 100644 index 0000000000..76362afbc5 --- /dev/null +++ b/_partials/_integration-debezium-docker.md @@ -0,0 +1,139 @@ + +1. **Run Zookeeper in Docker** + + In another Terminal window, run the following command: + ```bash + docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:3.0 + ``` + Check the output log to see that zookeeper is running. + +1. **Run Kafka in Docker** + + In another Terminal window, run the following command: + ```bash + docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:3.0 + ``` + Check the output log to see that kafka is running. + + +1. **Run Kafka Connect in Docker** + + In another Terminal window, run the following command: + ```bash + docker run -it --rm --name connect \ + -p 8083:8083 \ + -e GROUP_ID=1 \ + -e CONFIG_STORAGE_TOPIC=accounts \ + -e OFFSET_STORAGE_TOPIC=offsets \ + -e STATUS_STORAGE_TOPIC=storage \ + --link kafka:kafka \ + --link timescaledb:timescaledb \ + quay.io/debezium/connect:3.0 + ``` + Check the output log to see that kafka connect is running. + + +1. **Register the Debezium PostgreSQL source connector** + + Update the `` for the `` you created in your $SELF_LONG instance in the following command. + Then run the command in another Terminal window: + ```bash + curl -X POST http://localhost:8083/connectors \ + -H "Content-Type: application/json" \ + -d '{ + "name": "timescaledb-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "timescaledb", + "database.port": "5432", + "database.user": "", + "database.password": "", + "database.dbname" : "postgres", + "topic.prefix": "accounts", + "plugin.name": "pgoutput", + "schema.include.list": "public,_timescaledb_internal", + "transforms": "timescaledb", + "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", + "transforms.timescaledb.database.hostname": "timescaledb", + "transforms.timescaledb.database.port": "5432", + "transforms.timescaledb.database.user": "", + "transforms.timescaledb.database.password": "", + "transforms.timescaledb.database.dbname": "postgres" + } + }' + ``` + +1. **Verify `timescaledb-source-connector` is included in the connector list** + + 1. Check the tasks associated with `timescaledb-connector`: + ```bash + curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/timescaledb-connector + ``` + You see something like: + ```bash + {"name":"timescaledb-connector","config": + { "connector.class":"io.debezium.connector.postgresql.PostgresConnector", + "transforms.timescaledb.database.hostname":"timescaledb", + "transforms.timescaledb.database.password":"debeziumpassword","database.user":"debezium", + "database.dbname":"postgres","transforms.timescaledb.database.dbname":"postgres", + "transforms.timescaledb.database.user":"debezium", + "transforms.timescaledb.type":"io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", + "transforms.timescaledb.database.port":"5432","transforms":"timescaledb", + "schema.include.list":"public,_timescaledb_internal","database.port":"5432","plugin.name":"pgoutput", + "topic.prefix":"accounts","database.hostname":"timescaledb","database.password":"debeziumpassword", + "name":"timescaledb-connector"},"tasks":[{"connector":"timescaledb-connector","task":0}],"type":"source"} + ``` + +1. **Verify `timescaledb-connector` is running** + + 1. Open the Terminal window running Kafka connect. When the connector is active, you see something like the following: + + ```bash + 2025-04-30 10:40:15,168 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal._hyper_1_1_chunk' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,168 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming SignalProcessor started. Scheduling it every 5000ms [io.debezium.pipeline.signal.SignalProcessor] + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming Creating thread debezium-postgresconnector-accounts-SignalProcessor [io.debezium.util.Threads] + 2025-04-30 10:40:15,175 INFO Postgres|accounts|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Retrieved latest position from stored offset 'LSN{0/1FCE570}' [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/1FCE570}' [io.debezium.connector.postgresql.connection.WalPositionLocator] + 2025-04-30 10:40:15,176 INFO Postgres|accounts|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] + 2025-04-30 10:40:15,189 INFO Postgres|accounts|streaming Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1FCCFF0}, catalogXmin=884] [io.debezium.connector.postgresql.connection.PostgresConnection] + 2025-04-30 10:40:15,189 INFO Postgres|accounts|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] + 2025-04-30 10:40:15,204 INFO Postgres|accounts|streaming Requested thread factory for component PostgresConnector, id = accounts named = keep-alive [io.debezium.util.Threads] + 2025-04-30 10:40:15,204 INFO Postgres|accounts|streaming Creating thread debezium-postgresconnector-accounts-keep-alive [io.debezium.util.Threads] + 2025-04-30 10:40:15,216 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_policy_chunk_stats' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,216 INFO Postgres|accounts|streaming REPLICA IDENTITY for 'public.accounts' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat_history' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal._hyper_1_1_chunk' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,217 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal.bgw_job_stat' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] + 2025-04-30 10:40:15,219 INFO Postgres|accounts|streaming Processing messages [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] + ``` + + 1. Watch the events in the accounts topic on your $SELF_LONG instance. + + In another Terminal instance, run the following command: + + ```bash + docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:3.0 watch-topic -a -k accounts + ``` + + You see the topics being streamed. For example: + + ```bash + status-task-timescaledb-connector-0 {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":31} + status-topic-timescaledb.public.accounts:connector-timescaledb-connector {"topic":{"name":"timescaledb.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009337985}} + status-topic-accounts._timescaledb_internal.bgw_job_stat:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338118}} + status-topic-accounts._timescaledb_internal.bgw_job_stat:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338120}} + status-topic-accounts._timescaledb_internal.bgw_job_stat_history:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat_history","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338243}} + status-topic-accounts._timescaledb_internal.bgw_job_stat_history:connector-timescaledb-connector {"topic":{"name":"accounts._timescaledb_internal.bgw_job_stat_history","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338245}} + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338250}} + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} + status-topic-accounts.public.accounts:connector-timescaledb-connector {"topic":{"name":"accounts.public.accounts","connector":"timescaledb-connector","task":0,"discoverTimestamp":1746009338251}} + ["timescaledb-connector",{"server":"accounts"}] {"last_snapshot_record":true,"lsn":33351024,"txId":893,"ts_usec":1746009337290783,"snapshot":"INITIAL","snapshot_completed":true} + status-connector-timescaledb-connector {"state":"UNASSIGNED","trace":null,"worker_id":"172.17.0.5:8083","generation":31} + status-task-timescaledb-connector-0 {"state":"UNASSIGNED","trace":null,"worker_id":"172.17.0.5:8083","generation":31} + status-connector-timescaledb-connector {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":33} + status-task-timescaledb-connector-0 {"state":"RUNNING","trace":null,"worker_id":"172.17.0.5:8083","generation":33} + ``` diff --git a/integrations/debezium.md b/integrations/debezium.md index 08e8521077..508ad7bea1 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -6,7 +6,7 @@ keywords: [Debezium, integrate] --- import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; -import IntegrationApacheKafka from "versionContent/_partials/_integration-apache-kafka-install.mdx"; +import IntegrationDebeziumDocker from "versionContent/_partials/_integration-debezium-docker.mdx"; import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integration-debezium-self-hosted-config-database.mdx"; # Integrate Debezium with $CLOUD_LONG @@ -50,121 +50,13 @@ To setup $SELF_LONG to communicate with Debezium: -## Install and configure Apache Kafka - -To install and configure Apache Kafka: - - - - - - - -Keep these terminals open, you use them to test the integration later. - ## Configure Debezium to listen to your database Set up Kafka Connect server, plugins, drivers, and connectors: -1. **Install the Debezium connector** - - In another Terminal window, navigate to , then download and configure the PostgreSQL sink and driver. - ```bash - mkdir -p "plugins/debezium-connector-postgres" - curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.1.1.Final/debezium-connector-postgres-3.1.1.Final-plugin.tar.gz \ - | tar -xzf - -C "plugins/debezium-connector-postgres" --strip-components=1 - echo "plugin.path=`pwd`/libs,`pwd`/plugins/debezium-connector-postgres" >> "config/connect-distributed.properties" - echo "plugin.path=`pwd`/libs,`pwd`/plugins/debezium-connector-postgres" >> "config/connect-standalone.properties" - ``` - -1. **Configure Debezium to poll your database** - - Write the following configuration to `/config/timescale-debezium-sink.properties`, then update the - `` with your [connection details][connection-info]. - - ```properties - name=timescale-debezium-sink - connector.class=io.debezium.connector.postgresql.PostgresConnector - database.hostname= - database.port= - database.user= - database.password= - database.dbname= - topic.prefix=accounts - plugin.name=pgoutput - transforms=timescaledb - transforms.timescaledb.type=io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb - transforms.timescaledb.database.hostname= - transforms.timescaledb.database.port= - transforms.timescaledb.database.user= - transforms.timescaledb.database.password= - transforms.timescaledb.database.dbname= - publication.autocreate.mode=all_tables - schema.include.list=public,_timescaledb_internal - ``` - - - The values for the `*.hostname`, `*.port`, `*.user`, `*.password`, and `*.dbname` properties must match. You - created `` in [Configure your database to work with Debezium][debezium-configure-database] - - `topic.prefix` is the name of the kafka topic you created in [Install and configure Apache Kafka][kafka-install-configure]. - -1. **Start Kafka Connect** - - ```bash - export CLASSPATH=`pwd`/plugins/debezium-connector-postgres/*:`pwd`/libs/* - ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-debezium-sink.properties - ``` - - Use the `-daemon` flag to run this process in the background. - -1. **Verify Kafka Connect is running** - - In yet another another Terminal window, run the following command: - - ```bash - curl http://localhost:8083 - ``` - You see something like: - ```bash - {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"} - ``` - - 1. **Test the connection** - - 1. Connect to your $SELF_LONG instance. - - Use [`psql`][psql-connect]. - 1. Insert data into the table you created: - - ```sql - INSERT INTO accounts (name,city) VALUES ('Lola','Copacabana'); - ``` - - 2. In another Terminal window, navigate to , then run `kafka-console-consumer` to consume the events you just sent: - ```bash - bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 - ``` - You see something like: - ```bash - { - "topic": "timescaledb.public.accounts", - "value": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"1970-01-01T00:00:00.000000Z\",\"field\":\"created_at\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"city\"}],\"optional\":true,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"1970-01-01T00:00:00.000000Z\",\"field\":\"created_at\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"city\"}],\"optional\":true,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"version\":1,\"field\":\"source\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"}],\"optional\":false,\"name\":\"accounts._timescaledb_internal._hyper_1_1_chunk.Envelope\",\"version\":2},\"payload\":{\"before\":null,\"after\":{\"created_at\":\"2025-04-22T15:25:29.681517Z\",\"name\":\"sdfg\",\"city\":\"asdf\"},\"source\":{\"version\":\"3.1.0.Final\",\"connector\":\"postgresql\",\"name\":\"accounts\",\"ts_ms\":1745335529692,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"29187960\\\"]\",\"ts_us\":1745335529692108,\"ts_ns\":1745335529692108000,\"schema\":\"public\",\"table\":\"accounts\",\"txId\":769,\"lsn\":29187960,\"xmin\":null},\"transaction\":null,\"op\":\"c\",\"ts_ms\":1745335530172,\"ts_us\":1745335530172473,\"ts_ns\":1745335530172473672}}", - "headers": [ - { - "key": "__debezium_timescaledb_chunk_table", - "value": "_hyper_1_1_chunk" - }, - { - "key": "__debezium_timescaledb_chunk_schema", - "value": "_timescaledb_internal" - } - ], - "timestamp": 1745335530762, - "partition": 0, - "offset": 0 - } - ``` + @@ -182,6 +74,7 @@ We are working on enabling this feature as you read. As soon as it is live, thes +And that is it, you have configured debezium to interact with $ [hypertables]: /use-timescale/:currentVersion:/hypertables/ [hypercore]: /use-timescale/:currentVersion:/hypercore/ From c85081f58c5decc90c3a2fd066d82d2d55809cbf Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 30 Apr 2025 13:07:44 +0200 Subject: [PATCH 12/18] chore: update for docker. --- integrations/debezium.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 508ad7bea1..710536a350 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -50,7 +50,7 @@ To setup $SELF_LONG to communicate with Debezium: -## Configure Debezium to listen to your database +## Configure Debezium to work with your database Set up Kafka Connect server, plugins, drivers, and connectors: @@ -60,9 +60,6 @@ Set up Kafka Connect server, plugins, drivers, and connectors: -You have successfully integrated Debezium. - - @@ -74,7 +71,7 @@ We are working on enabling this feature as you read. As soon as it is live, thes -And that is it, you have configured debezium to interact with $ +And that is it, you have configured Debezium to interact with $COMPANY products. [hypertables]: /use-timescale/:currentVersion:/hypertables/ [hypercore]: /use-timescale/:currentVersion:/hypercore/ From c82dfd11f9ea8fd20f8987c9131112c39b806c3c Mon Sep 17 00:00:00 2001 From: Iain Cox Date: Wed, 30 Apr 2025 14:09:49 +0200 Subject: [PATCH 13/18] Apply suggestions from code review Co-authored-by: Anastasiia Tovpeko <114177030+atovpeko@users.noreply.github.com> Signed-off-by: Iain Cox --- _partials/_integration-debezium-docker.md | 6 +++--- ...integration-debezium-self-hosted-config-database.md | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/_partials/_integration-debezium-docker.md b/_partials/_integration-debezium-docker.md index 76362afbc5..359b86afa2 100644 --- a/_partials/_integration-debezium-docker.md +++ b/_partials/_integration-debezium-docker.md @@ -13,7 +13,7 @@ ```bash docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:3.0 ``` - Check the output log to see that kafka is running. + Check the output log to see that Kafka is running. 1. **Run Kafka Connect in Docker** @@ -30,7 +30,7 @@ --link timescaledb:timescaledb \ quay.io/debezium/connect:3.0 ``` - Check the output log to see that kafka connect is running. + Check the output log to see that Kafka Connect is running. 1. **Register the Debezium PostgreSQL source connector** @@ -86,7 +86,7 @@ 1. **Verify `timescaledb-connector` is running** - 1. Open the Terminal window running Kafka connect. When the connector is active, you see something like the following: + 1. Open the Terminal window running Kafka Connect. When the connector is active, you see something like the following: ```bash 2025-04-30 10:40:15,168 INFO Postgres|accounts|streaming REPLICA IDENTITY for '_timescaledb_internal._hyper_1_1_chunk' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.postgresql.PostgresSchema] diff --git a/_partials/_integration-debezium-self-hosted-config-database.md b/_partials/_integration-debezium-self-hosted-config-database.md index 32e14875cd..861a634872 100644 --- a/_partials/_integration-debezium-self-hosted-config-database.md +++ b/_partials/_integration-debezium-self-hosted-config-database.md @@ -10,7 +10,7 @@ - MacOS: `/opt/homebrew/var/postgresql@/` - Windows: `C:\Program Files\PostgreSQL\\data\` - 1. Enable logical replication + 1. Enable logical replication. Modify the following settings in `postgresql.conf`: @@ -20,7 +20,7 @@ max_wal_senders = 10 ``` - 1. Open `pg_hba.conf` and enable host replication: + 1. Open `pg_hba.conf` and enable host replication. To allow replication connections, add the following: @@ -47,7 +47,7 @@ 1. **Enable a replication spot for Debezium** - 1. Create a table for Debezium to listen to + 1. Create a table for Debezium to listen to: ```sql CREATE TABLE accounts (created_at TIMESTAMPTZ DEFAULT NOW(), @@ -55,7 +55,7 @@ city TEXT); ``` - 1. Turn the table into a hypertable + 1. Turn the table into a hypertable: ```sql SELECT create_hypertable('accounts', 'created_at'); @@ -63,7 +63,7 @@ Debezium also works with [$CAGGs][caggs]. - 1. Create a publication and enable a replication slot + 1. Create a publication and enable a replication slot: ```sql CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update'); From 67b1721577f07dd57c43fcf2292d2b9b1c889dd1 Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 30 Apr 2025 14:15:26 +0200 Subject: [PATCH 14/18] chore: update for docker. --- _partials/_integration-prereqs-self-only.md | 7 +++++++ integrations/debezium.md | 7 ++++--- 2 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 _partials/_integration-prereqs-self-only.md diff --git a/_partials/_integration-prereqs-self-only.md b/_partials/_integration-prereqs-self-only.md new file mode 100644 index 0000000000..4165f8fc2e --- /dev/null +++ b/_partials/_integration-prereqs-self-only.md @@ -0,0 +1,7 @@ + +To follow the steps on this page: + +* Create a target [self-hosted $TIMESCALE_DB][enable-timescaledb] instance. + + +[enable-timescaledb]: /self-hosted/:currentVersion:/install/ diff --git a/integrations/debezium.md b/integrations/debezium.md index 710536a350..664db472d4 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -5,7 +5,7 @@ products: [cloud, mst, self_hosted] keywords: [Debezium, integrate] --- -import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; +import IntegrationPrereqsSelfOnly from "versionContent/_partials/_integration-prereqs-self-only.mdx"; import IntegrationDebeziumDocker from "versionContent/_partials/_integration-debezium-docker.mdx"; import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integration-debezium-self-hosted-config-database.mdx"; @@ -32,9 +32,9 @@ This page explains how to capture changes in your database and stream them using ## Prerequisites - + -- [Java8 or higher][java-installers] to run Apache Kafka. +- [Install Docker][install-docker] on your development machine. ## Configure your database to work with Debezium @@ -87,3 +87,4 @@ And that is it, you have configured Debezium to interact with $COMPANY products [kafka-install-configure]: /integrations/:currentVersion:/debezium#install-and-configure-apache-kafka [debezium-configure-database]: /integrations/:currentVersion:/debezium##configure-your-database-to-work-with-debezium [psql-connect]: /integrations/:currentVersion:/psql/#connect-to-your-service +[install-docker]: https://docs.docker.com/engine/install/ From b46ee56a8013a343b605587d4f9dfbed51e16d4f Mon Sep 17 00:00:00 2001 From: atovpeko Date: Thu, 6 Feb 2025 14:48:09 +0200 Subject: [PATCH 15/18] draft --- integrations/debezium.md | 154 +++++++++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 47 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 664db472d4..360207e343 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -1,90 +1,150 @@ --- title: Integrate Debezium with Timescale Cloud -excerpt: Integrate Debezium with Timescale Cloud to enable change data capture in your Timescale Cloud service and streaming to Redis Streams +excerpt: Integrate Debezium with Timescale Cloud to enable change data capture for your PostgreSQL workloads products: [cloud, mst, self_hosted] keywords: [Debezium, integrate] --- -import IntegrationPrereqsSelfOnly from "versionContent/_partials/_integration-prereqs-self-only.mdx"; -import IntegrationDebeziumDocker from "versionContent/_partials/_integration-debezium-docker.mdx"; -import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integration-debezium-self-hosted-config-database.mdx"; +import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; # Integrate Debezium with $CLOUD_LONG [Debezium][debezium] is an open-source distributed platform for change data capture (CDC). -It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. +It enables you to capture changes Win your $SERVICE_LONG and stream them to other systems in real time. -Debezium can capture events about: +This pages explains how to integrate Debezium with $CLOUD_LONG and Kafka -- [Hypertables][hypertables]: captured events are rerouted from their chunk-specific topics to a single logical topic - named according to the following pattern: `..` -- [Continuous aggregates][caggs]: captured events are rerouted from their chunk-specific topics to a single logical topic - named according to the following pattern: `..` -- [Hypercore][hypercore]: If you enable hypercore, the Debezium $TIMESCALE_DB connector does not apply any special - processing to data in the columnstore. Compressed chunks are forwarded unchanged to the next downstream job in the - pipeline for further processing as needed. Typically, messages with compressed chunks are dropped, and are not - processed by subsequent jobs in the pipeline. +## Prerequisites - This limitation only affects changes to chunks in the columnstore. Changes to data in the rowstore work correctly. + +- Install [Debezium][debezium-install] -This page explains how to capture changes in your database and stream them using Debezium on Apache Kafka. +## Connect your $SERVICE_LONG -## Prerequisites + - + -- [Install Docker][install-docker] on your development machine. +To connect to $CLOUD_LONG: -## Configure your database to work with Debezium + - +1. **Enable logical replication for your $SERVICE_LONG** - + 1. Connect to your $SERVICE_SHORT using your [connection details][connection-info]. -To setup $SELF_LONG to communicate with Debezium: + 1. Run the following command to enable logical replication: - + ```sql + ALTER SYSTEM SET wal_level = 'logical'; + ALTER SYSTEM SET max_replication_slots = 10; + ALTER SYSTEM SET max_wal_senders = 10; + ``` + + 1. Restart your $SERVICE_SHORT. - +1. **Create a replication slot** - + ```sql + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` -## Configure Debezium to work with your database +1. **Configure Debezium** -Set up Kafka Connect server, plugins, drivers, and connectors: + Modify the Debezium connector configuration to point to $CLOUD_LONG using your [connection details][connection-info]: - + ```json + { + "name": "timescale-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "", + "database.port": "5432", + "database.user": "", + "database.password": "", + "database.dbname": "", + "database.server.name": "timescale-server", + } + } + ``` + +1. **Test the connection** - + Start the Debezium connector and ensure it connects to your $SERVICE_LONG successfully. - + + +To connect to your $SELF_LONG database: + + + +1. **Enable logical replication in $TIMESCALE_DB** -Debezium requires logical replication to be enabled. Currently, this is not enabled by default on $SERVICE_LONGs. -We are working on enabling this feature as you read. As soon as it is live, these docs will be updated. + 1. Modify the following settings in `postgresql.conf`. It is usually located in `/var/lib/postgresql/data/postgresql.conf` or `/etc/postgresql/*/main/postgresql.conf`: + + ``` + wal_level = logical + max_replication_slots = 10 + max_wal_senders = 10 + ``` + + 1. Restart PostgreSQL. + +1. **Create a replication slot** + + 1. Connect to your database using your [connection details][connection-info]. + 1. Run the following command: + + ```sql + SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); + ``` + + 1. Grant replication privileges to the user Debezium will use: + + ```sql + ALTER ROLE WITH REPLICATION; + ``` + +1. **Configure Debezium** + + Create a Debezium connector configuration file `debezium-postgres.json`: + + ```json + { + "name": "timescale-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "", + "database.port": "5432", + "database.user": "", + "database.password": "", + "database.dbname": "", + "database.server.name": "timescale-server", + "plugin.name": "pgoutput", + "slot.name": "debezium_slot", + "publication.name": "debezium_publication" + } + } + ``` + +1. **Test the connection** + + Start the Debezium connector and ensure it connects to your database successfully. + + -And that is it, you have configured Debezium to interact with $COMPANY products. +You have successfully integrated Debezium with $CLOUD_LONG. -[hypertables]: /use-timescale/:currentVersion:/hypertables/ -[hypercore]: /use-timescale/:currentVersion:/hypercore/ -[caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ -[connection-info]: /integrations/:currentVersion:/find-connection-details/ +[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ [debezium]: https://debezium.io/ -[java-installers]: https://www.oracle.com/java/technologies/downloads/ -[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_installation +[debezium-install]: https://debezium.io/documentation/reference/stable/install.html [console]: https://console.cloud.timescale.com/dashboard/services -[redis-local]: https://redis.io/docs/getting-started/ -[redis-cloud]: https://redis.com/try-free/ -[connect]: /getting-started/:currentVersion:/run-queries-from-console/ -[kafka-install-configure]: /integrations/:currentVersion:/debezium#install-and-configure-apache-kafka -[debezium-configure-database]: /integrations/:currentVersion:/debezium##configure-your-database-to-work-with-debezium -[psql-connect]: /integrations/:currentVersion:/psql/#connect-to-your-service -[install-docker]: https://docs.docker.com/engine/install/ From 25b3f8dd55129e5593e5877b3d6f0c6644e2a3e1 Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 30 Apr 2025 14:26:43 +0200 Subject: [PATCH 16/18] chore: cleanup. --- integrations/debezium-server.md | 264 -------------------------------- 1 file changed, 264 deletions(-) delete mode 100644 integrations/debezium-server.md diff --git a/integrations/debezium-server.md b/integrations/debezium-server.md deleted file mode 100644 index ee0473e6b2..0000000000 --- a/integrations/debezium-server.md +++ /dev/null @@ -1,264 +0,0 @@ ---- -title: Integrate Debezium with Timescale Cloud -excerpt: Integrate Debezium with Timescale Cloud to enable change data capture in your Timescale Cloud service and streaming to Redis Streams -products: [cloud, mst, self_hosted] -keywords: [Debezium, integrate] ---- - -import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; - -# Integrate Debezium with $CLOUD_LONG - -[Debezium][debezium] is an open-source distributed platform for change data capture (CDC). -It enables you to capture changes in $CLOUD_LONG and stream them to other systems in real time. - -This page explains how to capture changes in your $SERVICE_LONG and stream them using Debezium. - -## Prerequisites - - - -- [Java8 or higher][java-installers] to run Debezium Server. -- Install [Debezium Server][debezium-install]. -- -- Install [Redis][redis-local] or sign up for [Redis Cloud][redis-cloud]. -- For a self-hosted installation, ensure that your PostgreSQL database is accessible from the Debezium Server instance. - -## Connect your $SERVICE_LONG - - - - - -To connect to $CLOUD_LONG: - - - -1. **Enable logical replication for your $SERVICE_LONG** - - 1. [Connect][connect] to your $SERVICE_SHORT using your [connection details][connection-info]. - - 1. Run the following command to enable logical replication: - - ```sql - ALTER SYSTEM SET wal_level = logical; - SELECT pg_reload_conf(); - ``` - - 1. Restart your $SERVICE_SHORT. - -1. **Create a table** - - Create a table to test the integration. For example: - - ```sql - CREATE TABLE sensor_data ( - id SERIAL PRIMARY KEY, - device_id TEXT NOT NULL, - temperature FLOAT NOT NULL, - recorded_at TIMESTAMPTZ DEFAULT now() - ); - ``` - -1. **Configure Debezium** - - 1. Navigate to the `conf` directory in your Debezium files. - - 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. - - ```properties - debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector - debezium.source.database.hostname= - debezium.source.database.port= - debezium.source.database.user= - debezium.source.database.password= - debezium.source.database.dbname= - debezium.source.plugin.name=pgoutput - debezium.source.slot.name=debezium_slot - debezium.source.publication.autocreate.mode=filtered - - # Define Redis Streams as the sink - debezium.sink.type=redis - debezium.sink.redis.address=localhost:6379 - debezium.sink.redis.stream.name=debezium_stream - debezium.format.value=json - ``` - -1. **Start Debezium Server** - - ```sh - bin/debezium-server-start.sh conf/application.properties - ``` - -1. **Test the connection** - - 1. Insert data into the table you created in $CLOUD_LONG: - - ```sql - INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); - ``` - - 1. Run the following command to check Redis Streams for incoming CDC events: - - ```sh - redis-cli XREAD STREAMS debezium_stream 0 - ``` - - You should see something like this: - - ```json - { - "op": "c", - "ts_ms": 1708000000000, - "source": { - "table": "sensor_data", - "db": "your_database", - "schema": "public" - }, - "after": { - "id": 1, - "device_id": "sensor-001", - "temperature": 22.5, - "recorded_at": "2024-02-15T12:00:00Z" - } - } - ``` - -You have successfully integrated Debezium with $CLOUD_LONG. - - - - - - - -To connect to your database: - - - -1. **Enable logical replication in PostgreSQL** - - 1. Modify the following settings in `postgresql.conf`: - - ```ini - wal_level = logical - max_replication_slots = 10 - max_wal_senders = 10 - ``` - - 1. Restart PostgreSQL. - - 1. Add the following to `pg_hba.conf` to allow replication connections: - - ``` - host replication debezium 0.0.0.0/0 md5 - ``` - - 1. Restart PostgreSQL. - -1. **Create a Debezium user in PostgreSQL** - - Create a user with the `LOGIN` and `REPLICATION` permissions: - - ```sql - CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'your_password'; - ``` - -1. **Create a table** - - Debezium will stream changes from this table to Redis. For example: - - ```sql - CREATE TABLE sensor_data ( - id SERIAL PRIMARY KEY, - device_id TEXT NOT NULL, - temperature FLOAT NOT NULL, - recorded_at TIMESTAMPTZ DEFAULT now() - ); - ``` - -1. **Create a replication slot and a publication** - - ```sql - CREATE PUBLICATION debezium_pub FOR TABLE sensor_data; - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); - ``` -1. **Configure Debezium** - - 1. Navigate to the `conf` directory in your Debezium files. - - 1. Edit `application.properties` using your [connection details][connection-info]. If using Redis Cloud, replace `localhost:6379` with the Redis Cloud endpoint. - - ```properties - debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector - debezium.source.database.hostname= - debezium.source.database.port= - debezium.source.database.user= - debezium.source.database.password= - debezium.source.database.dbname= - debezium.source.plugin.name=pgoutput - debezium.source.slot.name=debezium_slot - - # Define Redis Streams as the sink - debezium.sink.type=redis - debezium.sink.redis.address=localhost:6379 - debezium.sink.redis.stream.name=debezium_stream - debezium.format.value=json - ``` - -1. **Start Debezium Server** - - ```sh - bin/debezium-server-start.sh conf/application.properties - ``` - -1. **Test the connection** - - 1. Insert data into the table you created earlier: - - ```sql - INSERT INTO sensor_data (device_id, temperature) VALUES ('sensor-001', 22.5); - ``` - - 1. Run the following command to check Redis Streams for incoming CDC events: - - ```sh - redis-cli XREAD STREAMS debezium_stream 0 - ``` - - You should see something like this: - - ```json - { - "op": "c", - "ts_ms": 1708000000000, - "source": { - "table": "sensor_data", - "db": "your_database", - "schema": "public" - }, - "after": { - "id": 1, - "device_id": "sensor-001", - "temperature": 22.5, - "recorded_at": "2024-02-15T12:00:00Z" - } - } - ``` - -You have successfully integrated Debezium with self-hosted $TIMESCALE_DB. - - - - - - - -[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ -[debezium]: https://debezium.io/ -[java-installers]: https://www.oracle.com/java/technologies/downloads/ -[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_installation -[console]: https://console.cloud.timescale.com/dashboard/services -[redis-local]: https://redis.io/docs/getting-started/ -[redis-cloud]: https://redis.com/try-free/ -[connect]: /getting-started/:currentVersion:/run-queries-from-console/ From 11f5cf665fce6880d8f885fc732f238ef6a69dd1 Mon Sep 17 00:00:00 2001 From: Iain Date: Wed, 30 Apr 2025 14:38:42 +0200 Subject: [PATCH 17/18] chore: the merge from another dimension. --- integrations/debezium.md | 154 ++++++++++++--------------------------- 1 file changed, 47 insertions(+), 107 deletions(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index 360207e343..dcf51306de 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -1,150 +1,90 @@ --- title: Integrate Debezium with Timescale Cloud -excerpt: Integrate Debezium with Timescale Cloud to enable change data capture for your PostgreSQL workloads +excerpt: Integrate Debezium with Timescale Cloud to enable change data capture in your Timescale Cloud service and streaming to Redis Streams products: [cloud, mst, self_hosted] keywords: [Debezium, integrate] --- -import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx"; +import IntegrationPrereqsSelfOnly from "versionContent/_partials/_integration-prereqs-self-only.mdx"; +import IntegrationDebeziumDocker from "versionContent/_partials/_integration-debezium-docker.mdx"; +import IntegrationDebeziumSelfHostedConfig from "versionContent/_partials/_integration-debezium-self-hosted-config-database.mdx"; # Integrate Debezium with $CLOUD_LONG [Debezium][debezium] is an open-source distributed platform for change data capture (CDC). -It enables you to capture changes Win your $SERVICE_LONG and stream them to other systems in real time. +It enables you to capture changes in a $SELF_LONG instance and stream them to other systems in real time. -This pages explains how to integrate Debezium with $CLOUD_LONG and Kafka +Debezium can capture events about: -## Prerequisites - - +- [Hypertables][hypertables]: captured events are rerouted from their chunk-specific topics to a single logical topic + named according to the following pattern: `..` +- [Continuous aggregates][caggs]: captured events are rerouted from their chunk-specific topics to a single logical topic + named according to the following pattern: `..` +- [Hypercore][hypercore]: If you enable hypercore, the Debezium $TIMESCALE_DB connector does not apply any special + processing to data in the columnstore. Compressed chunks are forwarded unchanged to the next downstream job in the + pipeline for further processing as needed. Typically, messages with compressed chunks are dropped, and are not + processed by subsequent jobs in the pipeline. -- Install [Debezium][debezium-install] + This limitation only affects changes to chunks in the columnstore. Changes to data in the rowstore work correctly. -## Connect your $SERVICE_LONG - - - - -To connect to $CLOUD_LONG: - - +This page explains how to capture changes in your database and stream them using Debezium on Apache Kafka. -1. **Enable logical replication for your $SERVICE_LONG** - - 1. Connect to your $SERVICE_SHORT using your [connection details][connection-info]. - - 1. Run the following command to enable logical replication: +## Prerequisites - ```sql - ALTER SYSTEM SET wal_level = 'logical'; - ALTER SYSTEM SET max_replication_slots = 10; - ALTER SYSTEM SET max_wal_senders = 10; - ``` - - 1. Restart your $SERVICE_SHORT. + -1. **Create a replication slot** +- [Install Docker][install-docker] on your development machine. - ```sql - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); - ``` +## Configure your database to work with Debezium -1. **Configure Debezium** + - Modify the Debezium connector configuration to point to $CLOUD_LONG using your [connection details][connection-info]: + - ```json - { - "name": "timescale-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "", - "database.port": "5432", - "database.user": "", - "database.password": "", - "database.dbname": "", - "database.server.name": "timescale-server", - } - } - ``` +To setup $SELF_LONG to communicate with Debezium: -1. **Test the connection** + - Start the Debezium connector and ensure it connects to your $SERVICE_LONG successfully. + - - - +## Configure Debezium to work with your database -To connect to your $SELF_LONG database: +Set up Kafka Connect server, plugins, drivers, and connectors: -1. **Enable logical replication in $TIMESCALE_DB** - - 1. Modify the following settings in `postgresql.conf`. It is usually located in `/var/lib/postgresql/data/postgresql.conf` or `/etc/postgresql/*/main/postgresql.conf`: - - ``` - wal_level = logical - max_replication_slots = 10 - max_wal_senders = 10 - ``` - - 1. Restart PostgreSQL. - -1. **Create a replication slot** - - 1. Connect to your database using your [connection details][connection-info]. - 1. Run the following command: - - ```sql - SELECT * FROM pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); - ``` - - 1. Grant replication privileges to the user Debezium will use: + - ```sql - ALTER ROLE WITH REPLICATION; - ``` - -1. **Configure Debezium** - - Create a Debezium connector configuration file `debezium-postgres.json`: - - ```json - { - "name": "timescale-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "", - "database.port": "5432", - "database.user": "", - "database.password": "", - "database.dbname": "", - "database.server.name": "timescale-server", - "plugin.name": "pgoutput", - "slot.name": "debezium_slot", - "publication.name": "debezium_publication" - } - } - ``` + -1. **Test the connection** + - Start the Debezium connector and ensure it connects to your database successfully. + - +Debezium requires logical replication to be enabled. Currently, this is not enabled by default on $SERVICE_LONGs. +We are working on enabling this feature as you read. As soon as it is live, these docs will be updated. -You have successfully integrated Debezium with $CLOUD_LONG. +And that is it, you have configured Debezium to interact with $COMPANY products. -[connection-info]: /use-timescale/:currentVersion:/integrations/find-connection-details/ +[hypertables]: /use-timescale/:currentVersion:/hypertables/ +[hypercore]: /use-timescale/:currentVersion:/hypercore/ +[caggs]: /use-timescale/:currentVersion:/continuous-aggregates/ +[connection-info]: /integrations/:currentVersion:/find-connection-details/ [debezium]: https://debezium.io/ -[debezium-install]: https://debezium.io/documentation/reference/stable/install.html +[java-installers]: https://www.oracle.com/java/technologies/downloads/ +[debezium-install]: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_installation [console]: https://console.cloud.timescale.com/dashboard/services +[redis-local]: https://redis.io/docs/getting-started/ +[redis-cloud]: https://redis.com/try-free/ +[connect]: /getting-started/:currentVersion:/run-queries-from-console/ +[kafka-install-configure]: /integrations/:currentVersion:/debezium#install-and-configure-apache-kafka +[debezium-configure-database]: /integrations/:currentVersion:/debezium##configure-your-database-to-work-with-debezium +[psql-connect]: /integrations/:currentVersion:/psql/#connect-to-your-service +[install-docker]: https://docs.docker.com/engine/install/ From eb63aa882e28833afffb389711db0bc304b46e0c Mon Sep 17 00:00:00 2001 From: Iain Cox Date: Wed, 30 Apr 2025 14:45:09 +0200 Subject: [PATCH 18/18] Update integrations/debezium.md Co-authored-by: Anastasiia Tovpeko <114177030+atovpeko@users.noreply.github.com> Signed-off-by: Iain Cox --- integrations/debezium.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/debezium.md b/integrations/debezium.md index dcf51306de..2b8df525c0 100644 --- a/integrations/debezium.md +++ b/integrations/debezium.md @@ -42,7 +42,7 @@ This page explains how to capture changes in your database and stream them using -To setup $SELF_LONG to communicate with Debezium: +To set up $SELF_LONG to communicate with Debezium: