A simple tutorial for calling webhooks on Postgres data and schema changes using pgstream.
Written by
Tudor Golubenco
Published on
August 27, 2024
pgstream is a CDC (Change-Data-Capture) tool focused on PostgreSQL. Among other things, it can be used to call webhooks whenever there is a data (or schema) change in a Postgres database. This means that whenever a row is inserted, updated, or deleted, or a table is created, altered, truncated or deleted, a webhook is notified of the relevant event details.
In this article we're going to dive into the pgstream webhooks functionality, including more advanced topics, like sending the old and new values, TOAST support, and schema changes support.
For this tutorial we need a Postgres instance that has logical replication enabled and the wal2json output plugin loaded. There are a number of ways to do this, but we're going to use the docker-compose file from the pgstream repo:
git clone https://github.com/xataio/pgstream.git
And then start the Postgres instance:
docker compose -f build/docker/docker-compose.yml up db
Then, in a new terminal window, you can connect to it and create a simple table to play with:
psql "postgres://postgres:postgres@localhost:5432/postgres"
At the psql prompt:
CREATE TABLE employees(id SERIAL PRIMARY KEY, name TEXT, role TEXT);
Let's now install pgstream
. One option is to build it from source, since we already have the repo cloned:
go build
The above assumes you have Go installed. If you don't, check out the other installation options.
Next step is to initialize pgstream
, like this:
./pgstream init --pgurl "postgres://postgres:postgres@localhost/postgres?sslmode=disable"
This creates several tables managed by pgstream, which are grouped in a schema called pgstream
.
The pgstream
repo comes with a pg2webhook.env
configuration file for this purpose. Its contents are:
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=disable"
# Processor config
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_ENABLED=false
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_REFRESH_INTERVAL="60s"
PGSTREAM_POSTGRES_LISTENER_URL
indicates which database the pgstream listener should connect to. PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL
is used as a metadata db to store the registered subscriptions and their configuration. In this example, they are both set to the same database, but you can use different databases or instances if you prefer. More details can be found in the readme configuration section.
You are now ready to start pgstream
like this:
./pgstream run -c pg2webhook.env
We now have pgstream ready to go, but we need something to receive the webhooks. This is normally your application, or a lambda function, or similar. pgstream comes with a sample webhooks listener written in Go. You can start it like this:
go run tools/webhook/webhook_server.go
This starts a server on port 9910 that simply logs any JSON payload that is received under the /webhook
path.
Now we have to create a subscription so that pgstream
knows to send the webhook to localhost:9910/webhook
where the webhook_server.go
program is waiting. We do this by calling an API which the webhook module from pgstream
serves:
curl localhost:9900/webhooks/subscribe \
-H "content-type: application/json" \
-d '{"url":"http://localhost:9910/webhook", "schema": "public", "table": "employees"}'
The above adds a line in the pgstream.webhook_subscriptions
table, which is maintained by pgstream
. You can verify by running via psql
:
postgres=# select * from pgstream.webhook_subscriptions;
url | schema_name | table_name | event_types
-------------------------------+-------------+------------+-------------
http://localhost:9910/webhook | public | employees |
(1 row)
Notes:
table
key unset.event_types
key to an array containing the operations you want (I
for inserts, U
for updates, D
for deletes, T
for truncates).With pgstream
and the webhooks_server.py
started in different consoles, let's run the following SQL via psql
:
INSERT INTO employees(name,role) VALUES('Dwight Schrute', 'Assistant Regional Manager');
You should see the following printed by the python program:
{
"Data": {
"action": "I",
"timestamp": "2024-08-20 13:57:16.806107+00",
"lsn": "0/15F1D60",
"schema": "public",
"table": "employees",
"columns": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
},
{
"id": "cr2a0enjc0j00h93rdj0-2",
"name": "name",
"type": "text",
"value": "Dwight Schrute"
},
{
"id": "cr2a0enjc0j00h93rdj0-3",
"name": "role",
"type": "text",
"value": "Assistant Regional Manager"
}
],
"identity": null,
"metadata": {
"schema_id": "cr2a0enjc0j00h93rdig",
"table_pgstream_id": "cr2a0enjc0j00h93rdj0",
"id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
"version_col_pgstream_id": ""
}
}
}
Things to note in the above:
I
for "insert".cr2a0enjc0j00h93rdj0-2
for the name
column). This ID is unique for each column and can be used to follow columns across renames.Now let's run an update statement:
UPDATE employees SET role='Assistant to the Regional Manager' WHERE name='Dwight Schrute';
The following webhook event is received:
{
"Data": {
"action": "U",
"timestamp": "2024-08-20 14:01:28.86832+00",
"lsn": "0/15F1FB8",
"schema": "public",
"table": "employees",
"columns": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
},
{
"id": "cr2a0enjc0j00h93rdj0-2",
"name": "name",
"type": "text",
"value": "Dwight Schrute"
},
{
"id": "cr2a0enjc0j00h93rdj0-3",
"name": "role",
"type": "text",
"value": "Assistant to the Regional Manager"
}
],
"identity": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
}
],
"metadata": {
"schema_id": "cr2a0enjc0j00h93rdig",
"table_pgstream_id": "cr2a0enjc0j00h93rdj0",
"id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
"version_col_pgstream_id": ""
}
}
}
The above is very similar to the INSERT event, with a couple of differences:
U
, for updateidentity
sub-object is now filled. It contains the primary key of the affected row. Notably, this is the value of the primary key before the update, in case the update has changed the value of the PK.How about including the old values in addition to the new values for UPDATEs? Yes, that's possible, using a little trick.
See how in the above UPDATE event, the "identity" contains the PK value, and we noted that this is the PK value before the update? That is because the "replica identity" for the table is set to "primary key", which is the default. However, the replica identity can be set to any unique group of columns, or to "all columns" by setting it to FULL:
ALTER TABLE employees REPLICA IDENTITY FULL;
If we run the update again:
UPDATE employees SET role='Assistant Regional Manager' WHERE name='Dwight Schrute';
We now get all values after the update, but also before the update, under the identity
field:
{
"Data": {
"action": "U",
"timestamp": "2024-08-20 17:50:50.503662+00",
"lsn": "0/15F65F0",
"schema": "public",
"table": "employees",
"columns": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
},
{
"id": "cr2a0enjc0j00h93rdj0-2",
"name": "name",
"type": "text",
"value": "Dwight Schrute"
},
{
"id": "cr2a0enjc0j00h93rdj0-3",
"name": "role",
"type": "text",
"value": "Assistant Regional Manager"
}
],
"identity": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
},
{
"id": "cr2a0enjc0j00h93rdj0-2",
"name": "name",
"type": "text",
"value": "Dwight Schrute"
},
{
"id": "cr2a0enjc0j00h93rdj0-3",
"name": "role",
"type": "text",
"value": "Assistant to the Regional Manager"
}
],
"metadata": {
"schema_id": "cr2ae77jc0j00h93rdjg",
"table_pgstream_id": "cr2a0enjc0j00h93rdj0",
"id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
"version_col_pgstream_id": ""
}
}
}
Let's try a simple delete as well:
DELETE FROM employees WHERE name='Dwight Schrute';
The generated event:
{
"Data": {
"action": "D",
"timestamp": "2024-08-21 15:31:55.992618+00",
"lsn": "0/163CEC8",
"schema": "public",
"table": "employees",
"columns": null,
"identity": [
{
"id": "cr2a0enjc0j00h93rdj0-1",
"name": "id",
"type": "integer",
"value": 1
},
{
"id": "cr2a0enjc0j00h93rdj0-2",
"name": "name",
"type": "text",
"value": "Dwight Schrute"
},
{
"id": "cr2a0enjc0j00h93rdj0-3",
"name": "role",
"type": "text",
"value": "Assistant Regional Manager"
},
{
"id": "cr2a0enjc0j00h93rdj0-4",
"name": "last_name",
"type": "text",
"value": null
}
],
"metadata": {
"schema_id": "cr2r0lvjc0j00h93rdn0",
"table_pgstream_id": "cr2a0enjc0j00h93rdj0",
"id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
"version_col_pgstream_id": ""
}
}
}
To note:
D
, for "delete"identity
object. That is because we have the replicate identity set to FULL, otherwise only the PK would have been included.Setting the replica identity to full accomplishes one more thing: it includes the large values in the webhook events.
Postgres values over the page size (commonly 8k, after compression) are stored separately from the main table data in TOAST storage. This means they are also not included in the logical replication events by default, unless they are changed by the current UPDATE statement. This is a gotcha: you are expecting all columns to always be present, but once they reach a certain size they will be missing.
However, setting the replica identity to full
solves this. To test this, try the following:
-- set the identity back to be just PK
ALTER TABLE employees REPLICA IDENTITY DEFAULT;
-- set the role column to a large value.
-- This will be in the event because the role column is modified
UPDATE employees SET role=(
SELECT string_agg(generate_series::text, ' ') FROM generate_series(0, 8000)
) WHERE name='Dwight Schrute';
-- change another column.
-- This time the role column is not present because it's TOASTed
UPDATE employees SET name='Jim Halpert' WHERE name='Dwight Schrute';
-- set the identity to FULL
ALTER TABLE employees REPLICA IDENTITY FULL;
-- try the update again, this time the value is included in the event.
UPDATE employees SET name='Dwight Schrute' WHERE name='Jim Halpert';
Note, however, that the TOASTed value is included only in the identity
part of the event, so your application needs to pick it up from there.
At this point, you might be wondering what are the downsides of setting the replica identity to full. Luckily, we wrote a full blog post about that. In short: there are some performance impacts, more relevant for the Postgres to Postgres replication use case and less relevant for the webhooks calling use case.
Let's look at one more pgstream trick: receiving webhooks for schema changes, in addition to data changes.
Postgres replication events have the well-known limitation that they don't include any schema changes. This can be an issue if your application requires having an up-to-date view of the schema.
pgstream works around this limitation by installing DDL event triggers and maintaining the list of changes in a separate table, called pgstream.schema_log
. Then, pgstream can subscribe to this table just like any other table.
To get schema events, we create another subscription pointing to the same webhooks listener server as before:
curl localhost:9900/webhooks/subscribe \
-H "content-type: application/json" \
-d '{"url":"http://localhost:9910/webhook", "schema": "pgstream", "table": "schema_log"}'
Now if we do a schema change:
ALTER TABLE employees ADD COLUMN first_name TEXT;
We get an event like this:
{
"Data": {
"action": "I",
"timestamp": "2024-08-21 09:10:07.940957+00",
"lsn": "0/16240F8",
"schema": "pgstream",
"table": "schema_log",
"columns": [
{
"id": "",
"name": "id",
"type": "pgstream.xid",
"value": "cr2qtrvjc0j00h93rdmg"
},
{
"id": "",
"name": "version",
"type": "bigint",
"value": 11
},
{
"id": "",
"name": "schema_name",
"type": "text",
"value": "public"
},
{
"id": "",
"name": "schema",
"type": "jsonb",
"value": "{\"tables\": [{\"oid\": \"16480\", \"name\": \"employees\", \"columns\": [{\"name\": \"first_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-4\"}, {\"name\": \"name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-2\"}, {\"name\": \"role\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-3\"}, {\"name\": \"id\", \"type\": \"integer\", \"unique\": true, \"default\": \"nextval('public.employees_id_seq'::regclass)\", \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-1\"}], \"pgstream_id\": \"cr2a0enjc0j00h93rdj0\", \"primary_key_columns\": [\"id\"]}, {\"oid\": \"16472\", \"name\": \"webhook_subscriptions\", \"columns\": [{\"name\": \"url\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-1\"}, {\"name\": \"schema_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-2\"}, {\"name\": \"table_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-3\"}, {\"name\": \"event_types\", \"type\": \"text[]\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-4\"}], \"pgstream_id\": \"cr29p2vjc0j00i13rdhg\", \"primary_key_columns\": [\"url\", \"schema_name\", \"table_name\"]}]}"
},
{
"id": "",
"name": "created_at",
"type": "timestamp without time zone",
"value": "2024-08-21 09:10:07.909214"
},
{
"id": "",
"name": "acked",
"type": "boolean",
"value": false
}
],
"identity": null,
"metadata": {
"schema_id": null,
"table_pgstream_id": "",
"id_col_pgstream_id": null,
"version_col_pgstream_id": ""
}
}
}
The event is more complex, but if you look under the schema
column value, you see the newly added column:
{
"name": "first_name",
"type": "text",
"unique": false,
"default": null,
"metadata": null,
"nullable": true,
"pgstream_id": "cr2a0enjc0j00h93rdj0-4"
}
Note how you get not only the name and the type of the column, but also metadata information (is nullable, is unique, default value).
Also, the pgstream_id
value is unique and fixed for this column, regardless of the name. You can verify this by performing a rename:
ALTER TABLE employees RENAME COLUMN first_name TO last_name;
In this guide, we demonstrated how to set up and trigger webhooks based on data changes in a PostgreSQL database using pgstream
. We covered the initial setup, webhook configuration, and handling basic operations like inserts, updates, and deletes.
pgstream
also offers advanced capabilities, such as sending both old and new values during updates and managing schema changes. By integrating pgstream
into your PostgreSQL environment, you can automate workflows, trigger real-time notifications, and build more responsive applications, whether for data synchronization, auditing, or microservices communication.
If you have any questions or hit issues with this tutorial, please open an issue in the pgstream repo or join us in Discord.
Xata provides the best free plan in the industry. It is production ready by default and doesn't pause or cool-down. Take your time to build your business and upgrade when you're ready to scale.
Sign up to our newsletter
By subscribing, you agree with Xata’s Terms of Service and Privacy Policy.
Copyright © 2024 Xatabase Inc.
All rights reserved.