Postgres webhooks with pgstream

A simple tutorial for calling webhooks on Postgres data and schema changes using pgstream.

Written by

Tudor Golubenco

Published on

August 27, 2024

pgstream is a CDC (Change-Data-Capture) tool focused on PostgreSQL. Among other things, it can be used to call webhooks whenever there is a data (or schema) change in a Postgres database. This means that whenever a row is inserted, updated, or deleted, or a table is created, altered, truncated or deleted, a webhook is notified of the relevant event details.

In this article we're going to dive into the pgstream webhooks functionality, including more advanced topics, like sending the old and new values, TOAST support, and schema changes support.

Postgres webhooks with pgstream
Postgres webhooks with pgstream

#

Preparation

For this tutorial we need a Postgres instance that has logical replication enabled and the wal2json output plugin loaded. There are a number of ways to do this, but we're going to use the docker-compose file from the pgstream repo:

git clone https://github.com/xataio/pgstream.git

And then start the Postgres instance:

docker compose -f build/docker/docker-compose.yml up db

Then, in a new terminal window, you can connect to it and create a simple table to play with:

psql "postgres://postgres:postgres@localhost:5432/postgres"

At the psql prompt:

CREATE TABLE employees(id SERIAL PRIMARY KEY, name TEXT, role TEXT);

Let's now install pgstream. One option is to build it from source, since we already have the repo cloned:

go build

The above assumes you have Go installed. If you don't, check out the other installation options.

Next step is to initialize pgstream, like this:

./pgstream init --pgurl "postgres://postgres:postgres@localhost/postgres?sslmode=disable"

This creates several tables managed by pgstream, which are grouped in a schema called pgstream.

The pgstream repo comes with a pg2webhook.env configuration file for this purpose. Its contents are:

# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=disable"

# Processor config
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_ENABLED=false
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_REFRESH_INTERVAL="60s"

PGSTREAM_POSTGRES_LISTENER_URL indicates which database the pgstream listener should connect to. PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL is used as a metadata db to store the registered subscriptions and their configuration. In this example, they are both set to the same database, but you can use different databases or instances if you prefer. More details can be found in the readme configuration section.

You are now ready to start pgstream like this:

./pgstream run -c pg2webhook.env

We now have pgstream ready to go, but we need something to receive the webhooks. This is normally your application, or a lambda function, or similar. pgstream comes with a sample webhooks listener written in Go. You can start it like this:

go run tools/webhook/webhook_server.go

This starts a server on port 9910 that simply logs any JSON payload that is received under the /webhook path.

Now we have to create a subscription so that pgstream knows to send the webhook to localhost:9910/webhook where the webhook_server.go program is waiting. We do this by calling an API which the webhook module from pgstream serves:

curl localhost:9900/webhooks/subscribe \
    -H "content-type: application/json" \
    -d '{"url":"http://localhost:9910/webhook", "schema": "public", "table": "employees"}'

The above adds a line in the pgstream.webhook_subscriptions table, which is maintained by pgstream. You can verify by running via psql:

postgres=# select * from pgstream.webhook_subscriptions;
              url              | schema_name | table_name | event_types
-------------------------------+-------------+------------+-------------
 http://localhost:9910/webhook | public      | employees  |
(1 row)

Notes:

  • in the API call above, we subscribe to a particular table. If you'd like to subscribe to all tables in a schema, leave the table key unset.
  • you can also subscribe to only some particular event types. To do this, set the event_types key to an array containing the operations you want (I for inserts, U for updates, D for deletes, T for truncates).

With pgstream and the webhooks_server.py started in different consoles, let's run the following SQL via psql:

INSERT INTO employees(name,role) VALUES('Dwight Schrute', 'Assistant Regional Manager');

You should see the following printed by the python program:

{
  "Data": {
    "action": "I",
    "timestamp": "2024-08-20 13:57:16.806107+00",
    "lsn": "0/15F1D60",
    "schema": "public",
    "table": "employees",
    "columns": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-2",
        "name": "name",
        "type": "text",
        "value": "Dwight Schrute"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-3",
        "name": "role",
        "type": "text",
        "value": "Assistant Regional Manager"
      }
    ],
    "identity": null,
    "metadata": {
      "schema_id": "cr2a0enjc0j00h93rdig",
      "table_pgstream_id": "cr2a0enjc0j00h93rdj0",
      "id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
      "version_col_pgstream_id": ""
    }
  }
}

Things to note in the above:

  • The "action" is I for "insert".
  • The LSN (Log Sequence Number) is included. This is a unique and sorted value generated by Postgres, which you can use to order the events in case they are processed out of order.
  • Each column is included with their name, type, new value, and an ID generated by pgstream (e.g. cr2a0enjc0j00h93rdj0-2 for the name column). This ID is unique for each column and can be used to follow columns across renames.

Now let's run an update statement:

UPDATE employees SET role='Assistant to the Regional Manager' WHERE name='Dwight Schrute';

The following webhook event is received:

{
  "Data": {
    "action": "U",
    "timestamp": "2024-08-20 14:01:28.86832+00",
    "lsn": "0/15F1FB8",
    "schema": "public",
    "table": "employees",
    "columns": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-2",
        "name": "name",
        "type": "text",
        "value": "Dwight Schrute"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-3",
        "name": "role",
        "type": "text",
        "value": "Assistant to the Regional Manager"
      }
    ],
    "identity": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      }
    ],
    "metadata": {
      "schema_id": "cr2a0enjc0j00h93rdig",
      "table_pgstream_id": "cr2a0enjc0j00h93rdj0",
      "id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
      "version_col_pgstream_id": ""
    }
  }
}

The above is very similar to the INSERT event, with a couple of differences:

  • The "action" is set to U, for update
  • The identity sub-object is now filled. It contains the primary key of the affected row. Notably, this is the value of the primary key before the update, in case the update has changed the value of the PK.

How about including the old values in addition to the new values for UPDATEs? Yes, that's possible, using a little trick.

See how in the above UPDATE event, the "identity" contains the PK value, and we noted that this is the PK value before the update? That is because the "replica identity" for the table is set to "primary key", which is the default. However, the replica identity can be set to any unique group of columns, or to "all columns" by setting it to FULL:

ALTER TABLE employees REPLICA IDENTITY FULL;

If we run the update again:

UPDATE employees SET role='Assistant Regional Manager' WHERE name='Dwight Schrute';

We now get all values after the update, but also before the update, under the identity field:

{
  "Data": {
    "action": "U",
    "timestamp": "2024-08-20 17:50:50.503662+00",
    "lsn": "0/15F65F0",
    "schema": "public",
    "table": "employees",
    "columns": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-2",
        "name": "name",
        "type": "text",
        "value": "Dwight Schrute"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-3",
        "name": "role",
        "type": "text",
        "value": "Assistant Regional Manager"
      }
    ],
    "identity": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-2",
        "name": "name",
        "type": "text",
        "value": "Dwight Schrute"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-3",
        "name": "role",
        "type": "text",
        "value": "Assistant to the Regional Manager"
      }
    ],
    "metadata": {
      "schema_id": "cr2ae77jc0j00h93rdjg",
      "table_pgstream_id": "cr2a0enjc0j00h93rdj0",
      "id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
      "version_col_pgstream_id": ""
    }
  }
}

Let's try a simple delete as well:

DELETE FROM employees WHERE name='Dwight Schrute';

The generated event:

{
  "Data": {
    "action": "D",
    "timestamp": "2024-08-21 15:31:55.992618+00",
    "lsn": "0/163CEC8",
    "schema": "public",
    "table": "employees",
    "columns": null,
    "identity": [
      {
        "id": "cr2a0enjc0j00h93rdj0-1",
        "name": "id",
        "type": "integer",
        "value": 1
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-2",
        "name": "name",
        "type": "text",
        "value": "Dwight Schrute"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-3",
        "name": "role",
        "type": "text",
        "value": "Assistant Regional Manager"
      },
      {
        "id": "cr2a0enjc0j00h93rdj0-4",
        "name": "last_name",
        "type": "text",
        "value": null
      }
    ],
    "metadata": {
      "schema_id": "cr2r0lvjc0j00h93rdn0",
      "table_pgstream_id": "cr2a0enjc0j00h93rdj0",
      "id_col_pgstream_id": ["cr2a0enjc0j00h93rdj0-1"],
      "version_col_pgstream_id": ""
    }
  }
}

To note:

  • The "action" key is set to D, for "delete"
  • The full row is included in the identity object. That is because we have the replicate identity set to FULL, otherwise only the PK would have been included.

Setting the replica identity to full accomplishes one more thing: it includes the large values in the webhook events.

Postgres values over the page size (commonly 8k, after compression) are stored separately from the main table data in TOAST storage. This means they are also not included in the logical replication events by default, unless they are changed by the current UPDATE statement. This is a gotcha: you are expecting all columns to always be present, but once they reach a certain size they will be missing.

However, setting the replica identity to full solves this. To test this, try the following:

-- set the identity back to be just PK
ALTER TABLE employees REPLICA IDENTITY DEFAULT;

-- set the role column to a large value.
-- This will be in the event because the role column is modified
UPDATE employees SET role=(
    SELECT string_agg(generate_series::text, ' ') FROM generate_series(0, 8000)
    ) WHERE name='Dwight Schrute';

-- change another column.
-- This time the role column is not present because it's TOASTed
UPDATE employees SET name='Jim Halpert' WHERE name='Dwight Schrute';

-- set the identity to FULL
ALTER TABLE employees REPLICA IDENTITY FULL;

-- try the update again, this time the value is included in the event.
UPDATE employees SET name='Dwight Schrute' WHERE name='Jim Halpert';

Note, however, that the TOASTed value is included only in the identity part of the event, so your application needs to pick it up from there.

At this point, you might be wondering what are the downsides of setting the replica identity to full. Luckily, we wrote a full blog post about that. In short: there are some performance impacts, more relevant for the Postgres to Postgres replication use case and less relevant for the webhooks calling use case.

Let's look at one more pgstream trick: receiving webhooks for schema changes, in addition to data changes.

Postgres replication events have the well-known limitation that they don't include any schema changes. This can be an issue if your application requires having an up-to-date view of the schema.

pgstream works around this limitation by installing DDL event triggers and maintaining the list of changes in a separate table, called pgstream.schema_log. Then, pgstream can subscribe to this table just like any other table.

To get schema events, we create another subscription pointing to the same webhooks listener server as before:

curl localhost:9900/webhooks/subscribe \
  -H "content-type: application/json" \
  -d '{"url":"http://localhost:9910/webhook", "schema": "pgstream", "table": "schema_log"}'

Now if we do a schema change:

ALTER TABLE employees ADD COLUMN first_name TEXT;

We get an event like this:

{
  "Data": {
    "action": "I",
    "timestamp": "2024-08-21 09:10:07.940957+00",
    "lsn": "0/16240F8",
    "schema": "pgstream",
    "table": "schema_log",
    "columns": [
      {
        "id": "",
        "name": "id",
        "type": "pgstream.xid",
        "value": "cr2qtrvjc0j00h93rdmg"
      },
      {
        "id": "",
        "name": "version",
        "type": "bigint",
        "value": 11
      },
      {
        "id": "",
        "name": "schema_name",
        "type": "text",
        "value": "public"
      },
      {
        "id": "",
        "name": "schema",
        "type": "jsonb",
        "value": "{\"tables\": [{\"oid\": \"16480\", \"name\": \"employees\", \"columns\": [{\"name\": \"first_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-4\"}, {\"name\": \"name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-2\"}, {\"name\": \"role\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-3\"}, {\"name\": \"id\", \"type\": \"integer\", \"unique\": true, \"default\": \"nextval('public.employees_id_seq'::regclass)\", \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr2a0enjc0j00h93rdj0-1\"}], \"pgstream_id\": \"cr2a0enjc0j00h93rdj0\", \"primary_key_columns\": [\"id\"]}, {\"oid\": \"16472\", \"name\": \"webhook_subscriptions\", \"columns\": [{\"name\": \"url\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-1\"}, {\"name\": \"schema_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-2\"}, {\"name\": \"table_name\", \"type\": \"text\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": false, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-3\"}, {\"name\": \"event_types\", \"type\": \"text[]\", \"unique\": false, \"default\": null, \"metadata\": null, \"nullable\": true, \"pgstream_id\": \"cr29p2vjc0j00i13rdhg-4\"}], \"pgstream_id\": \"cr29p2vjc0j00i13rdhg\", \"primary_key_columns\": [\"url\", \"schema_name\", \"table_name\"]}]}"
      },
      {
        "id": "",
        "name": "created_at",
        "type": "timestamp without time zone",
        "value": "2024-08-21 09:10:07.909214"
      },
      {
        "id": "",
        "name": "acked",
        "type": "boolean",
        "value": false
      }
    ],
    "identity": null,
    "metadata": {
      "schema_id": null,
      "table_pgstream_id": "",
      "id_col_pgstream_id": null,
      "version_col_pgstream_id": ""
    }
  }
}

The event is more complex, but if you look under the schema column value, you see the newly added column:

{
  "name": "first_name",
  "type": "text",
  "unique": false,
  "default": null,
  "metadata": null,
  "nullable": true,
  "pgstream_id": "cr2a0enjc0j00h93rdj0-4"
}

Note how you get not only the name and the type of the column, but also metadata information (is nullable, is unique, default value).

Also, the pgstream_id value is unique and fixed for this column, regardless of the name. You can verify this by performing a rename:

ALTER TABLE employees RENAME COLUMN first_name TO last_name;

In this guide, we demonstrated how to set up and trigger webhooks based on data changes in a PostgreSQL database using pgstream. We covered the initial setup, webhook configuration, and handling basic operations like inserts, updates, and deletes.

pgstream also offers advanced capabilities, such as sending both old and new values during updates and managing schema changes. By integrating pgstream into your PostgreSQL environment, you can automate workflows, trigger real-time notifications, and build more responsive applications, whether for data synchronization, auditing, or microservices communication.

If you have any questions or hit issues with this tutorial, please open an issue in the pgstream repo or join us in Discord.

Start free,
pay as you grow

Xata provides the best free plan in the industry. It is production ready by default and doesn't pause or cool-down. Take your time to build your business and upgrade when you're ready to scale.

Free plan includes
  • Single team member
  • 10 database branches
  • High availability
  • 15 GB data storage
  • 15 GB search engine storage
  • 2 GB file attachments
  • 250 AI queries per month
Start freeExplore all plans
Free plan includes
  • Single team member
  • 10 database branches
  • High availability
  • 15 GB data storage
  • 15 GB search engine storage
  • 2 GB file attachments
  • 250 AI queries per month

Sign up to our newsletter

By subscribing, you agree with Xata’s Terms of Service and Privacy Policy.

Copyright © 2024 Xatabase Inc.
All rights reserved.

Product

RoadmapFeature requestsPricingStatusAI solutionsFile attachments