Introduction
This tutorial will showcase the use of pgstream to replicate data from a PostgreSQL database to multiple targets (PostgreSQL and OpenSearch), leveraging Kafka. Kafka can also be used to replicate to a single target, to take advantage of the fan out model that allows for parallel processing of the WAL events thanks to topic partitioning.Requirements
- A source PostgreSQL database
- A target PostgreSQL database
- A target OpenSearch cluster
- A Kafka cluster
- pgstream (see installation instructions for more details)
Environment setup
The first step is to start the PostgreSQL databases that will be used as source and the target PostgreSQL database and OpenSearch cluster for replication, along with the Kafka cluster. Thepgstream repository provides a docker installation that will be used for the purposes of this tutorial, but can be replaced by any available PostgreSQL servers, as long as they have wal2json installed, and any Kafka and OpenSearch clusters.
To start the docker provided PostgreSQL servers, OpenSearch cluster and Kafka cluster run the following command:
5432 and 7654, an OpenSearch cluster on port 9200 and a Kafka cluster on port 9092.
Database initialisation
Once all the resources are up and running, the next step is to initialise pgstream on the source database. This will create thepgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. This step will also create a replication slot on the source database which will be used by the pgstream service.
The initialisation step allows to provide both the URL of the PostgreSQL database and the name of the replication slot to be created. The PostgreSQL URL is required, but the replication slot name is optional. If not provided, it will default to pgstream_<dbname>_slot, where <dbname> is the name of the PostgreSQL database. The configuration can be provided either by using the CLI supported parameters, or using the environment variables.
For this tutorial, we’ll create a replication slot with the name pgstream_tutorial_slot.
- Using CLI parameters:
- Using environment variables:
destroy CLI command.
Prepare pgstream configuration
For this tutorial, we need to prepare 3 different configuration files, since we’ll be running 3 separate instances of pgstream, each handling one of the flows described below.
PostgreSQL -> Kafka
Listener
In order to run pgstream, we need to provide the configuration required to run the PostgreSQL replication. First, we configure the listener module that will be listening to the WAL on the source PostgreSQL database. This requires the PostgreSQL database URL, which will be the one from the docker PostgreSQL server we started and setup in the previous steps.Processor
We will be using a Kafka processor which will be writing the WAL events into a Kafka topic. The topic is partitioned by schema. The only required configuration are the servers URLs and the topic settings. In our case, we’ll use the Kafka cluster initialised in the previous steps, and use a topic namedpgstream.
pgstream.schema_log table is stored, which in our case it’s the source PostgreSQL database.
pg2kafka_tutorial.env file to be used later on. An equivalent pg2kafka_tutorial.yaml configuration can be found below the environment one, and can be used interchangeably.
- Without initial snapshot
- With initial snapshot
Kafka -> PostgreSQL
Listener
In this case the listener will be a Kafka reader that will listen for the WAL events from the Kafka topic. The only configuration required are the kafka servers, the topic name and the consumer group id. We will use a consumer group for each of the targets, in this casepgstream-postgres-consumer-group.
earliest or latest.
Processor
This configuration is the same as the one from the tutorial for PostgreSQL to PostgreSQL replication. More details here. The full configuration for the kafka2pg step can be put into akafka2pg_tutorial.env file to be used later on. An equivalent kafka2pg_tutorial.yaml configuration can be found below the environment one, and can be used interchangeably.
Kafka -> OpenSearch
Listener
The configuration for the Kafka listener is the same as for the one in the previous step, the only difference will be the name of the consumer group id, to ensure it consumes the topic independently from the postgres one.Processor
This configuration is the same as the one from the tutorial for PostgreSQL to OpenSearch replication. More details here. The full configuration for the kafka2opensearch step can be put into akafka2os_tutorial.env file to be used later on. An equivalent kafka2os_tutorial.yaml configuration can be found below the environment one, and can be used interchangeably.
Validate pgstream status
We can validate that the initialisation and the configuration are valid by running the status command before starting pgstream. This can be run for all configuration files.
Run pgstream
With all configuration files ready, we can now run pgstream. In this case we set the log level as trace to provide more context for debugging and have more visibility into what pgstream is doing under the hood.
As mentioned above, we’ll need to run 3 different pgstream services on separate terminals, one with each configuration file.
Verify Replication
We should now be able to see the table created in the target PostgreSQL database:pgstream index on the OpenSearch cluster:
public index, with no documents for now (since there’s no data in the table):
test table, which should then appear in the target PostgreSQL database as well as the OpenSearch public index.
Troubleshooting
Here are some common issues you might encounter while following this tutorial and how to resolve them:1. Error: Connection refused
- Cause: The PostgreSQL database, Kafka cluster, or OpenSearch cluster is not running.
- Solution:
- Ensure the Docker containers for all services are running.
- Verify the database, Kafka, and OpenSearch URLs in the configuration files.
- Test the connections using the following commands:
2. Error: Replication slot not found
- Cause: The replication slot was not created during initialization.
- Solution:
- Reinitialize
pgstreamor manually create the replication slot. - Run the
pgstream statuscommand to validate the initialisation was successful. - Verify the replication slot exists by running:
- Reinitialize
3. Error: Kafka topic not found
- Cause: The Kafka topic was not created automatically or the topic name is incorrect.
- Solution:
- Ensure the
PGSTREAM_KAFKA_TOPIC_AUTO_CREATEvariable is set totruein the configuration. - Manually create the topic using the Kafka CLI:
- Ensure the
4. Error: Data not replicated to targets
- Cause: The Kafka consumer groups for PostgreSQL or OpenSearch are not configured correctly.
- Solution:
- Verify the consumer group IDs in the configuration files.
- Check the
pgstreamlogs for errors:
5. Error: Permission denied
- Cause: The database user does not have sufficient privileges.
- Solution:
- Grant the required privileges to the database user:
- Grant the required privileges to the database user:
6. Error: OpenSearch index not created
- Cause: The OpenSearch processor is not configured correctly.
- Solution:
- Verify the OpenSearch URL in the configuration file.
- Check the
pgstreamlogs for errors:
7. Error: Kafka TLS connection failed
- Cause: The Kafka TLS certificates are not configured correctly.
- Solution:
- Verify the paths to the certificate files in the configuration:
- Ensure the certificates are valid and match the Kafka server configuration.
- Verify the paths to the certificate files in the configuration:
Summary
In this tutorial, we successfully configuredpgstream to replicate data from a PostgreSQL database to multiple targets (PostgreSQL and OpenSearch) using Kafka as an intermediary. We:
- Set up the source PostgreSQL database, target PostgreSQL database, OpenSearch cluster, and Kafka cluster.
- Initialized
pgstreamon the source database, creating the necessary schema and replication slot. - Configured three separate
pgstreaminstances:- PostgreSQL to Kafka
- Kafka to PostgreSQL
- Kafka to OpenSearch
- Verified that schema changes and data changes were replicated correctly across all targets.
pgstream can leverage Kafka for scalable, real-time replication to multiple targets. For more advanced use cases, refer to the pgstream tutorials.