Skip to main content
⚠️ Be aware that a single source and a single target must be provided or the configuration validation will fail.

Yaml

The pgstream configuration can be provided as a yaml configuration file, which encapsulates the transformation configuration. The following sample shows the format for all supported fields.
instrumentation:
  metrics:
    endpoint: "0.0.0.0:4317"
    collection_interval: 60 # collection interval for metrics in seconds. Defaults to 60s
  traces:
    endpoint: "0.0.0.0:4317"
    sample_ratio: 0.5 # ratio of traces that will be sampled. Must be between 0.0-1.0, where 0 is no traces sampled, and 1 is all traces sampled.

source:
  postgres:
    url: "postgresql://user:password@localhost:5432/mydatabase"
    mode: snapshot_and_replication # options are replication, snapshot or snapshot_and_replication
    snapshot: # when mode is snapshot or snapshot_and_replication
      mode: full # one of full, schema or data. Defaults to full.
      tables: ["test", "test_schema.Test", "another_schema.*"] # tables to snapshot, can be a list of table names or a pattern
      excluded_tables: ["test_schema.Test"] # tables to exclude for snapshot, wildcards are not supported
      recorder:
        repeatable_snapshots: true # whether to repeat snapshots that have already been taken. Defaults to false
        postgres_url: "postgresql://user:password@localhost:5432/mytargetdatabase" # URL of the database where the snapshot status is recorded
      snapshot_workers: 4 # number of schemas to be snapshotted in parallel. Defaults to 1
      data: # when mode is full or data
        schema_workers: 4 # number of schema tables to be snapshotted in parallel. Defaults to 4
        table_workers: 4 # number of workers to snapshot a table in parallel. Defaults to 4
        batch_bytes: 83886080 # bytes to read per batch (defaults to 80MiB)
        max_connections: 50 # maximum number of connections that the data snapshot can open to Postgres. Should  be higher or equal than the number of schema/table workers.
      schema: # when mode is full or schema
        mode: pgdump_pgrestore # options are pgdump_pgrestore or schemalog
        pgdump_pgrestore:
          clean_target_db: true # whether to clean the target database before restoring. Defaults to false
          create_target_db: true # whether to create the database on the target postgres. Defaults to false
          include_global_db_objects: true # whether to include database global objects, such as extensions or triggers, on the schema snapshot. Defaults to false
          no_owner: false # whether to remove ownership commands from the dump. Defaults to false
          no_privileges: false # whether to prevent dumping privilege commands (grant/revoke). Defaults to false
          role: postgres # role name to be used to create the dump
          roles_snapshot_mode: # no_passwords by default. Can be set to disabled to disable roles snapshotting, or can be set to enabled to include role passwords
          exclude_security_labels: ["anon"] # list of providers whose security labels will be excluded from the snapshot. Wildcard supported.
          dump_file: pg_dump.sql # name of the file where the contents of the schema pg_dump command and output will be written for debugging purposes.
      disable_progress_tracking: false # whether to disable progress tracking for the snapshot. Defaults to false
    replication: # when mode is replication or snapshot_and_replication
      replication_slot: "pgstream_mydatabase_slot"
    retry_policy: # retry policy for postgres connections, one of exponential or constant or disable_retries
      disable_retries: false
      exponential:
        initial_interval: 500 # initial interval in milliseconds
        max_interval: 10000 # maximum interval in milliseconds
      constant:
        max_retries: 5 # maximum number of retries
        interval: 1000 # interval in milliseconds
  kafka:
    servers: ["localhost:9092"]
    topic:
      name: "mytopic"
    consumer_group:
      id: "mygroup" # id for the kafka consumer group. Defaults to pgstream-consumer-group
      start_offset: "earliest" # options are earliest or latest. Defaults to earliest.
    tls:
      ca_cert: "/path/to/ca.crt" # path to CA certificate
      client_cert: "/path/to/client.crt" # path to client certificate
      client_key: "/path/to/client.key" # path to client key
    backoff:
      disable_retries: false
      exponential:
        max_retries: 5 # maximum number of retries
        initial_interval: 1000 # initial interval in milliseconds
        max_interval: 60000 # maximum interval in milliseconds
      constant:
        max_retries: 5 # maximum number of retries
        interval: 1000 # interval in milliseconds

target:
  postgres:
    url: "postgresql://user:password@localhost:5432/mytargetdatabase"
    batch:
      timeout: 1000 # batch timeout in milliseconds. Defaults to 30s
      size: 100 # number of messages in a batch. Defaults to 20000
      max_bytes: 1572864 # max size of batch in bytes (1.5MiB). Defaults to 1.5MiB without bulk enabled, 80MiB with bulk ingest.
      max_queue_bytes: 104857600 # max size of memory guard queue in bytes (100MiB). Defaults to 100MiB
      ignore_send_errors: false # if true, log and ignore errors during batch sending. Warning: can result in consistency errors.
    schema_log_store_url: "postgresql://user:password@localhost:5432/mydatabase" # url to the postgres database where the schema log is stored to be used when performing schema change diffs
    disable_triggers: false # whether to disable triggers on the target database. Defaults to false
    on_conflict_action: "nothing" # options are update, nothing or error. Defaults to error
    bulk_ingest:
      enabled: true # whether to enable bulk ingest on the target postgres, using COPY FROM (supported for insert only workloads)
    retry_policy: # retry policy for postgres connections, one of exponential or constant or disable_retries.
      disable_retries: false
      exponential:
        initial_interval: 500 # initial interval in milliseconds
        max_interval: 10000 # maximum interval in milliseconds
      constant:
        max_retries: 5 # maximum number of retries
        interval: 1000 # interval in milliseconds
  kafka:
    servers: ["localhost:9092"]
    topic:
      name: "mytopic" # name of the Kafka topic
      partitions: 1 # number of partitions for the topic. Defaults to 1
      replication_factor: 1 # replication factor for the topic. Defaults to 1
      auto_create: true # whether to automatically create the topic if it doesn't exist. Defaults to false
    tls:
      ca_cert: "/path/to/ca.crt" # path to CA certificate
      client_cert: "/path/to/client.crt" # path to client certificate
      client_key: "/path/to/client.key" # path to client key
    batch:
      timeout: 1000 # batch timeout in milliseconds. Defaults to 1s
      size: 100 # number of messages in a batch. Defaults to 100
      max_bytes: 1572864 # max size of batch in bytes (1.5MiB). Defaults to 1.5MiB
      max_queue_bytes: 104857600 # max size of memory guard queue in bytes (100MiB). Defaults to 100MiB
      ignore_send_errors: false # if true, log and ignore errors during batch sending. Warning: can result in consistency errors.
  search:
    engine: "elasticsearch" # options are elasticsearch or opensearch
    url: "http://localhost:9200" # URL of the search engine
    batch:
      timeout: 1000 # batch timeout in milliseconds. Defaults to 1s
      size: 100 # number of messages in a batch. Defaults to 100
      max_bytes: 1572864 # max size of batch in bytes (1.5MiB). Defaults to 1.5MiB
      max_queue_bytes: 104857600 # max size of memory guard queue in bytes (100MiB). Defaults to 100MiB
      ignore_send_errors: false # if true, log and ignore errors during batch sending. Warning: can result in consistency errors.
    backoff:
      disable_retries: false
      exponential:
        max_retries: 5 # maximum number of retries
        initial_interval: 1000 # initial interval in milliseconds
        max_interval: 60000 # maximum interval in milliseconds
      constant:
        max_retries: 5 # maximum number of retries
        interval: 1000 # interval in milliseconds
  webhooks:
    subscriptions:
      store:
        url: "postgresql://user:password@localhost:5432/mydatabase" # URL of the database where the webhook subscriptions are stored
        cache:
          enabled: true # whether to enable caching for the subscription store. Defaults to false
          refresh_interval: 60 # interval in seconds to refresh the cache
      server:
        address: "localhost:9090" # address of the subscription server
        read_timeout: 60 # read timeout in seconds. Defaults to 5s
        write_timeout: 60 # write timeout in seconds. Defaults to 10s
    notifier:
      worker_count: 4 # number of notifications to be processed in parallel. Defaults to 10
      client_timeout: 1000 # timeout for the webhook client in milliseconds. Defaults to 10s

modifiers:
  injector:
    enabled: true # whether to inject pgstream metadata into the WAL events. Defaults to false
    schemalog_url: "postgres://postgres:postgres@localhost:5432?sslmode=disable" # URL of the schemalog database, if different from the source database
  filter: # one of include_tables or exclude_tables
    include_tables: # list of tables for which events should be allowed. Tables should be schema qualified. If no schema is provided, the public schema will be assumed. Wildcards "*" are supported.
      - "test"
      - "test_schema.test"
      - "another_schema.*"
    exclude_tables: # list of tables for which events should be skipped. Tables should be schema qualified. If no schema is provided, the public schema will be assumed. Wildcards "*" are supported.
      - "excluded_test"
      - "excluded_schema.test"
      - "another_excluded_schema.*"
  transformations:
    validation_mode: relaxed
    table_transformers:
      - schema: public
        table: test
        column_transformers:
          name:
            name: greenmask_firstname
            dynamic_parameters:
              gender:
                column: sex

Environment Variables

Here’s a list of all the environment variables that can be used to configure the individual modules, along with their descriptions and default values.

Sources

Postgres Listener

Default: N/A
Required: ✓ Required
Description: URL of the Postgres database to connect to for replication purposes.
Default: “pgstream_dbname_slot”
Required: Optional
Description: Name of the Postgres replication slot name.
Default: “full”
Required: Optional
Description: Mode in which the snapshot will be run. It can be one of schema, data or full (both schema and data).
Default: ""
Required: Optional
Description: Tables for which there will be an initial snapshot generated. The syntax supports wildcards. Tables without a schema defined will be applied the public schema. Example: for public.test_table and all tables in the test_schema schema, the value would be the following: "test_table test_schema.\*"
Default: ""
Required: Optional
Description: Tables that will be excluded in the snapshot process. The syntax does not support wildcards. Tables without a schema defined will be applied the public schema.
Default: 4
Required: Optional
Description: Number of tables per schema that will be processed in parallel by the snapshotting process.
Default: 4
Required: Optional
Description: Number of concurrent workers that will be used per table by the snapshotting process.
Default: 83886080 (80MiB)
Required: Optional
Description: Max batch size in bytes to be read and processed by each table worker at a time. The number of pages in the select queries will be based on this value.
Default: 1
Required: Optional
Description: Number of schemas that will be processed in parallel by the snapshotting process.
Default: 50
Required: Optional
Description: Maximum number of Postgres connections that will be opened by the snapshotting process. This value shouldn’t be lower than the number of schema/table workers selected.
Default: False
Required: Optional
Description: Forces the use of the pgstream.schema_log for the schema snapshot instead of using pg_dump/pg_restore for Postgres targets.
Default: False
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, option to issue commands to DROP all the objects that will be restored.
Default: False
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, option to snapshot all global database objects outside of the selected schema (such as extensions, triggers, etc).
Default: False
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, option to create the database being restored.
Default: False
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, do not output commands to set ownership of objects to match the original database.
Default: False
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, do not output privilege related commands (grant/revoke).
Default: []
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, list of providers whose security labels will be excluded.
Default: ""
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, role name to be used to create the dump.
Default: “no_passwords”
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, controls how roles are snapshotted. Possible values: “enabled” (snapshot all roles including passwords), “disabled” (do not snapshot roles), “no_passwords” (snapshot roles but exclude passwords).
Default: ""
Required: Optional
Description: When using pg_dump/pg_restore to snapshot schema for Postgres targets, file where the contents of the schema pg_dump command and output will be written for debugging purposes.
Default: ""
Required: Optional
Description: Postgres URL for the database where the snapshot requests and their status will be tracked. A table snapshot_requests will be created under a pgstream schema.
Default: False (run), True (snapshot)
Required: Optional
Description: Allow to repeat snapshots requests that have been already completed succesfully. If using the run command, initial snapshots won’t be repeatable by default. If the snapshot command is used instead, the snapshot will be repeatable by default.
Default: False
Required: Optional
Description: Whether to disable progress tracking for the snapshot.
Default: 500ms
Required: Optional
Description: Initial interval for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 10s
Required: Optional
Description: Max interval for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 20
Required: Optional
Description: Max retries for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 0
Required: Optional
Description: Constant interval for the backoff policy to be applied to the Postgres connection retries.
Default: 0
Required: Optional
Description: Max retries for the backoff policy to be applied to the Postgres connection retries.
Default: False
Required: Optional
Description: Disable any retry policy.
One of exponential/constant/disable retries retry policies can be provided for the Postgres connection retry strategy. If none is provided, the exponential defaults apply.

Kafka Listener

Default: N/A
Required: ✓ Required
Description: URLs for the Kafka servers to connect to.
Default: N/A
Required: ✓ Required
Description: Name of the Kafka topic to read from.
Default: N/A
Required: ✓ Required
Description: Name of the Kafka consumer group for the WAL Kafka reader.
Default: Earliest
Required: Optional
Description: Kafka offset from which the consumer will start if there’s no offset available for the consumer group.
Default: False
Required: Optional
Description: Enable TLS connection to the Kafka servers.
Default: ""
Required: Optional
Description: Path to the CA PEM certificate to use for Kafka TLS authentication.
Default: ""
Required: Optional
Description: Path to the client PEM certificate to use for Kafka TLS client authentication.
Default: ""
Required: Optional
Description: Path to the client PEM private key to use for Kafka TLS client authentication.
Default: 0
Required: Optional
Description: Initial interval for the exponential backoff policy to be applied to the Kafka commit retries.
Default: 0
Required: Optional
Description: Max interval for the exponential backoff policy to be applied to the Kafka commit retries.
Default: 0
Required: Optional
Description: Max retries for the exponential backoff policy to be applied to the Kafka commit retries.
Default: 0
Required: Optional
Description: Constant interval for the backoff policy to be applied to the Kafka commit retries.
Default: 0
Required: Optional
Description: Max retries for the backoff policy to be applied to the Kafka commit retries.
Default: False
Required: Optional
Description: Disable any retry policy.
One of exponential/constant backoff policies can be provided for the Kafka committing retry strategy. If none is provided, no retries apply.

Targets

Kafka Batch Writer

Default: N/A
Required: ✓ Required
Description: URLs for the Kafka servers to connect to.
Default: N/A
Required: ✓ Required
Description: Name of the Kafka topic to write to.
Default: 1
Required: Optional
Description: Number of partitions created for the Kafka topic if auto create is enabled.
Default: 1
Required: Optional
Description: Replication factor used when creating the Kafka topic if auto create is enabled.
Default: False
Required: Optional
Description: Auto creation of configured Kafka topic if it doesn’t exist.
Default: False
Required: Optional
Description: Enable TLS connection to the Kafka servers.
Default: ""
Required: Optional
Description: Path to the CA PEM certificate to use for Kafka TLS authentication.
Default: ""
Required: Optional
Description: Path to the client PEM certificate to use for Kafka TLS client authentication.
Default: ""
Required: Optional
Description: Path to the client PEM private key to use for Kafka TLS client authentication.
Default: 1s
Required: Optional
Description: Max time interval at which the batch sending to Kafka is triggered.
Default: 1572864
Required: Optional
Description: Max size in bytes for a given batch. When this size is reached, the batch is sent to Kafka.
Default: 100
Required: Optional
Description: Max number of messages to be sent per batch. When this size is reached, the batch is sent to Kafka.
Default: False
Required: Optional
Description: Whether to ignore errors encountered while sending batches to the target.
Default: 100MiB
Required: Optional
Description: Max memory used by the Kafka batch writer for inflight batches.

Search Batch Indexer

Default: N/A
Required: ✓ Required
Description: URL for the opensearch store to connect to (at least one of the URLs must be provided).
Default: N/A
Required: ✓ Required
Description: URL for the elasticsearch store to connect to (at least one of the URLs must be provided).
Default: 1s
Required: Optional
Description: Max time interval at which the batch sending to the search store is triggered.
Default: 100
Required: Optional
Description: Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store.
Default: False
Required: Optional
Description: Whether to ignore errors encountered while sending batches to the target.
Default: 100MiB
Required: Optional
Description: Max memory used by the search batch indexer for inflight batches.
Default: 1s
Required: Optional
Description: Initial interval for the exponential backoff policy to be applied to the search store operation retries.
Default: 1min
Required: Optional
Description: Max interval for the exponential backoff policy to be applied to the search store operation retries.
Default: 0
Required: Optional
Description: Max retries for the exponential backoff policy to be applied to the search store operation retries.
Default: 0
Required: Optional
Description: Constant interval for the backoff policy to be applied to the search store operation retries.
Default: 0
Required: Optional
Description: Max retries for the backoff policy to be applied to the search store operation retries.
Default: False
Required: Optional
Description: Disable any retry policy.
One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply. One of exponential/constant/disable retries backoff policies can be provided for the search store retry strategy. If none is provided, a default exponential backoff policy applies.

Webhook Notifier

Default: N/A
Required: ✓ Required
Description: URL for the webhook subscription store to connect to.
Default: False
Required: Optional
Description: Caching applied to the subscription store retrieval queries.
Default: 60s
Required: Optional
Description: Interval at which the subscription store cache will be refreshed. Indicates max cache staleness.
Default: 100MiB
Required: Optional
Description: Max memory used by the webhook notifier for inflight notifications.
Default: 10
Required: Optional
Description: Max number of concurrent workers that will send webhook notifications for a given WAL event.
Default: 10s
Required: Optional
Description: Max time the notifier will wait for a response from a webhook URL before timing out.
Default: “:9900”
Required: Optional
Description: Address for the subscription server to listen on.
Default: 5s
Required: Optional
Description: Max duration for reading an entire server request, including the body before timing out.
Default: 10s
Required: Optional
Description: Max duration before timing out writes of the response. It is reset whenever a new request’s header is read.

Postgres Batch Writer

Default: N/A
Required: ✓ Required
Description: URL for the PostgreSQL store to connect to
Default: 30s
Required: Optional
Description: Max time interval at which the batch sending to PostgreSQL is triggered.
Default: 20000
Required: Optional
Description: Max number of messages to be sent per batch. When this size is reached, the batch is sent to PostgreSQL.
Default: 100MiB
Required: Optional
Description: Max memory used by the postgres batch writer for inflight batches.
Default: 1.5MiB, 80MiB with bulk enabled
Required: Optional
Description: Max size in bytes for a given batch. When this size is reached, the batch is sent to PostgreSQL.
Default: False
Required: Optional
Description: Whether to ignore errors encountered while sending events to the target.
Default: N/A
Required: Optional
Description: URL of the store where the pgstream schemalog table which keeps track of schema changes is.
Default: False(run), True(snapshot)
Required: Optional
Description: Option to disable triggers on the target PostgreSQL database while performing the snaphot/replication streaming. It defaults to false when using the run command, and to true when using the snapshot command.
Default: error
Required: Optional
Description: Action to apply to inserts on conflict. Options are nothing, update or error.
Default: False(run), True(snapshot)
Required: Optional
Description: Wether to use COPY FROM on insert only workloads. It defaults to false when using the run command, and to true when using the snapshot command.
Default: 500ms
Required: Optional
Description: Initial interval for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 10s
Required: Optional
Description: Max interval for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 20
Required: Optional
Description: Max retries for the exponential backoff policy to be applied to the Postgres connection retries.
Default: 0
Required: Optional
Description: Constant interval for the backoff policy to be applied to the Postgres connection retries.
Default: 0
Required: Optional
Description: Max retries for the backoff policy to be applied to the Postgres connection retries.
Default: False
Required: Optional
Description: Disable any retry policy.
One of exponential/constant/disable retries retry policies can be provided for the Postgres connection retry strategy. If none is provided, the exponential defaults apply.

Modifiers

Injector

Default: N/A
Required: ✓ Required
Description: URL for the postgres URL where the schema log table is stored.

Transformer

Default: N/A
Required: Optional
Description: Filepath pointing to the yaml file containing the transformer rules.

Filter

Default: N/A
Required: Optional
Description: List of schema qualified tables for which the WAL events should be processed. If no schema is provided, public schema will be assumed. Wildcards are supported.
Default: N/A
Required: Optional
Description: List of schema qualified tables for which the WAL events should be skipped. If no schema is provided, public schema will be assumed. Wildcards are supported.

Instrumentation

Metrics

Default: N/A
Required: Optional
Description: Endpoint where the pgstream metrics will be exported to.
Default: 60s
Required: Optional
Description: Interval at which the pgstream metrics will be collected and exported.

Traces

Default: N/A
Required: Optional
Description: Endpoint where the pgstream traces will be exported to.
Default: 0
Required: Optional
Description: Ratio for the trace sampling. Value must be between 0.0 and 1.0, where 0.0 is no traces sampled, and 1.0 is all traces sampled.