Sample Header Ad - 728x90

How to set up Postgresql pglogical replication on the same database instance

0 votes
0 answers
208 views
I am trying to assess pglogical replication usage instead of the built-in one (particularly due to being able to filter by operation). I am using **a single** Azure Postgresql flexible server database instance where I have created two databases (main and replica). I have done the following setup: ### Main database
SELECT pglogical.create_node(
    node_name := 'main_node',
    dsn := 'host=localhost port=5432 dbname=main user=x password=***'
);

-- check that the node is there
SELECT * FROM pglogical.node;
SELECT pglogical.create_replication_set(
    set_name := 'custom_set',
    replicate_insert := true,
    replicate_update := true,
    replicate_delete := false
);

-- start with a single replicated table
SELECT pglogical.replication_set_add_table('custom_set', 'public."Message"');

-- check the configuration
select * from pglogical.tables
select * from pglogical.replication_set
select * from pglogical.replication_set_table

-- or better with
SELECT rs.set_name, n.nspname AS schema_name, c.relname AS table_name
FROM pglogical.replication_set_table rst
	JOIN pg_class c ON rst.set_reloid = c.oid
	JOIN pg_namespace n ON c.relnamespace = n.oid
	JOIN pglogical.replication_set rs ON rst.set_id = rs.set_id
WHERE rs.set_name = 'custom_set';
I have about 10K rows in the public."Message" on the main database. ### Replica
SELECT pglogical.create_node(
    node_name := 'replica_node',
    dsn := 'host=localhost port=5432 dbname=replica user=x password=*'
);

SELECT pglogical.create_subscription(
    subscription_name := 'main_to_replica_subscription',
    provider_dsn := 'host=localhost port=5432 dbname=main user=x password=*',
    replication_sets := ARRAY['custom_set']
);

-- check the objects
SELECT * FROM pglogical.subscription;

-- this shows 'streaming'
SELECT * 
FROM pglogical.show_subscription_status();
The initial data is replicated fine. However, when inserting more data in the main table things begin to go astray: - the replication status is sometimes catching up or replicating and does not seem to end (come back to streaming). - the replication speed seems to be very slow (I expect sub-100ms for such low data volumes) - the following query indicates a growing number of bytes to be replicated
SELECT application_name, 
       state, 
       pg_current_wal_lsn() AS current_wal_lsn,
       replay_lsn,
       pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS bytes_pending
FROM pg_stat_replication
- database connections seem to be randomly dropped (e.g. pgAdmin begin reporting that the connection to the server was lost). This issue disappears if the subscription is disabled. **I assume there is something wrong with the setup when done on the same database instance. Is there any workaround?** select version() reports: PostgreSQL 15.7 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0, 64-bit. **Notes**: I know from setting up the builtin PostgreSQL replication, that using the same instance required an explicit logical slot to avoid the replication getting stuck:
SELECT pg_create_logical_replication_slot('custom_slot', 'pgoutput');
However, I cannot find a similar workaround for the pglogical replication. On a very similar setup (the same database instance), the builtin replication worked with the workaround mentioned above. ### Relevant PostgreSQL server logs The only error related to pglogical I could find in the server logs looks like this: > ...-LOG: starting pglogical database manager for database replica > > ...-LOG: received replication command: CREATE_REPLICATION_SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL > pglogical_output","statement": "CREATE_REPLICATION_SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL > pglogical_output > > ...-LOG: logical decoding found consistent point at > 314/E005A1F0","detail": "There are no running > transactions.","statement": "CREATE_REPLICATION_SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL > pglogical_output > > -LOG: exported logical decoding snapshot: \"0000001B-000006BE-1\" with 0 transaction IDs","statement": "CREATE_REPLICATION_SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL > pglogical_output > > -LOG: received replication > command: START_REPLICATION SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL 314/E005A228 > (expected_encoding 'UTF8', min_proto_version '1', max_proto_version > '1', startup_params_format '1', \"binary.want_internal_basetypes\" > '1', \"binary.want_binary_basetypes\" '1', > \"binary.basetypes_major_version\" '1500', \"binary.sizeof_datum\" > '8', \"binary.sizeof_int\" '4', \"binary.sizeof_long\" '8', > \"binary.bigendian\" '0', \"binary.float4_byval\" '0', > \"binary.float8_byval\" '1', \"binary.integer_datetimes\" '0', > \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup', > \"pglogical.forward_origins\" '\"all\"', > \"pglogical.replication_set_names\" 'custom_set', > \"relmeta_cache_size\" '-1', pg_version '150007', pglogical_version > '2.4.2', pglogical_version_num '20402', pglogical_apply_pid > '10812')","statement": "START_REPLICATION SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL 314/E005A228 > (expected_encoding 'UTF8', min_proto_version '1', max_proto_version > '1', startup_params_format '1', \"binary.want_internal_basetypes\" > '1', \"binary.want_binary_basetypes\" '1', > \"binary.basetypes_major_version\" '1500', \"binary.sizeof_datum\" > '8', \"binary.sizeof_int\" '4', \"binary.sizeof_long\" '8', > \"binary.bigendian\" '0', \"binary.float4_byval\" '0', > \"binary.float8_byval\" '1', \"binary.integer_datetimes\" '0', > \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup', > \"pglogical.forward_origins\" '\"all\"', > \"pglogical.replication_set_names\" 'custom_set', > \"relmeta_cache_size\" '-1', pg_version '150007', pglogical_version > '2.4.2', pglogical_version_num '20402', pglogical_apply_pid '10812') > > > -LOG: starting logical decoding for slot \"pgl_localdev3d6043b_main_node_main_to_0246a7b\"","detail": > "Streaming transactions committing after 314/E005A228, reading WAL > from 314/E005A1F0.","statement": "START_REPLICATION SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL 314/E005A228 > (expected_encoding 'UTF8', min_proto_version '1', max_proto_version > '1', startup_params_format '1', \"binary.want_internal_basetypes\" > '1', \"binary.want_binary_basetypes\" '1', > \"binary.basetypes_major_version\" '1500', \"binary.sizeof_datum\" > '8', \"binary.sizeof_int\" '4', \"binary.sizeof_long\" '8', > \"binary.bigendian\" '0', \"binary.float4_byval\" '0', > \"binary.float8_byval\" '1', \"binary.integer_datetimes\" '0', > \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup', > \"pglogical.forward_origins\" '\"all\"', > \"pglogical.replication_set_names\" 'custom_set', > \"relmeta_cache_size\" '-1', pg_version '150007', pglogical_version > '2.4.2', pglogical_version_num '20402', pglogical_apply_pid '10812') > > > ...-LOG: connection received: host=127.0.0.1 port=43598 2024-10-16 > 07:54:21 UTC-670f712d.3189-LOG: connection authenticated: > identity=\"CN=azuresu.b32b9622594c.database.azure.com\" method=cert > (/datadrive/pg/data/pg_hba.conf:9) ...-LOG: connection authorized: > user=azuresu database=postgres application_name=psql SSL enabled > (protocol=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384, bits=256) > ...-LOG: > disconnection: session time: 0:00:00.165 user=azuresu > database=postgres host=127.0.0.1 port=43598 > ...-LOG: disconnection: > session time: 0:00:20.997 user=azuresu database=postgres > host=127.0.0.1 port=51290 > ...-LOG: connection received: > host=127.0.0.1 port=39900 > ...-LOG: connection authenticated: > identity=\"CN=azuresu.b32b9622594c.database.azure.com\" method=cert > (/datadrive/pg/data/pg_hba.conf:9) > ...-LOG: connection authorized: > user=azuresu database=postgres SSL enabled (protocol=TLSv1.3, > cipher=TLS_AES_256_GCM_SHA384, bits=256) 2024-10-16 07:54:30 > UTC-670f7122.313b-LOG: disconnection: session time: 0:00:20.044 > user=azuresu database=azure_maintenance host=127.0.0.1 port=51300 > ...-LOG: could not send data to client: Connection reset by > peer","context": "slot > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\", output plugin > \"pglogical_output\", in the change callback, associated LSN > 314/E43B3278","statement": "START_REPLICATION SLOT > \"pgl_localdev3d6043b_main_node_main_to_0246a7b\" LOGICAL 314/E005A228 > (expected_encoding 'UTF8', min_proto_version '1', max_proto_version > '1', startup_params_format '1', \"binary.want_internal_basetypes\" > '1', \"binary.want_binary_basetypes\" '1', > \"binary.basetypes_major_version\" '1500', \"binary.sizeof_datum\" > '8', \"binary.sizeof_int\" '4', \"binary.sizeof_long\" '8', > \"binary.bigendian\" '0', \"binary.float4_byval\" '0', > \"binary.float8_byval\" '1', \"binary.integer_datetimes\" '0', > \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup', > \"pglogical.forward_origins\" '\"all\"', > \"pglogical.replication_set_names\" 'custom_set', > \"relmeta_cache_size\" '-1', pg_version '150007', pglogical_version > '2.4.2', pglogical_version_num '20402', pglogical_apply_pid '13850') I have several such errors. I assume that they might be related to my disabling the subscription (the first step to cleanup the replication). ### Workers related information The Azure configuration related to the workers is as follows: Azure PostgreSQL workers related configuration In the logs I could only find worker-related information such as (not sure if it is related to the replication or not): > pg_qs: qs bgworker: worker woke up and performed operations, with > query data store as on pg_qs: qs bgworker: worker finished > successfully in 85.484192 ms, with query store as on, > default_transaction_read_only as off and having_schema as true manager > worker at slot 1 generation 2 detaching cleanly ### Message table schema and indexes
CREATE TABLE IF NOT EXISTS public."Message"
(
    "Id" uuid NOT NULL,
    "RequestRef" uuid NOT NULL,
    "RefId" uuid,
    "ReasonCode" citext COLLATE pg_catalog."default" NOT NULL,
    "StatusCode" citext COLLATE pg_catalog."default" NOT NULL,
    "IsChargeable" boolean NOT NULL,
    "DestinationIdentifier" character varying(32) COLLATE pg_catalog."default" NOT NULL,
    "DestCode" character varying(2) COLLATE pg_catalog."default" NOT NULL,
    "Sender" citext COLLATE pg_catalog."default" NOT NULL,
    "SenderTypeCode" citext COLLATE pg_catalog."default" NOT NULL,
    "Sector" citext COLLATE pg_catalog."default",
    "Vars" citext COLLATE pg_catalog."default",
    "Network" citext COLLATE pg_catalog."default",
    "IsPiiRemoved" boolean NOT NULL,
    "SentTimestamp" timestamp with time zone,
    "CreateTimestamp" timestamp with time zone NOT NULL,
    "UpdatedAt" timestamp with time zone NOT NULL,
    "OrderNo" bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ),
    "NormalizedDestinationIdentifier" character varying(32) COLLATE pg_catalog."default" NOT NULL DEFAULT ''::character varying,
    "MsgDirection" character varying(16) COLLATE pg_catalog."default" NOT NULL DEFAULT 'Unknown'::character varying,
    "CharSet" citext COLLATE pg_catalog."default" NOT NULL DEFAULT ''::citext,
    "ConvId" uuid,
    "IsFixedLine" boolean NOT NULL DEFAULT false,
    "IsViewed" boolean,
    "ViewedBy" uuid,
    CONSTRAINT "PK_Message" PRIMARY KEY ("Id")
)
TABLESPACE pg_default;

-- Index: IX_Message_ConvId

-- DROP INDEX IF EXISTS public."IX_Message_ConvId";

CREATE INDEX IF NOT EXISTS "IX_Message_ConvId"
    ON public."Message" USING btree
    ("ConvId" ASC NULLS LAST)
    TABLESPACE pg_default;

-- Index: IX_Message_NormalizedDestinationIdentifier_Sender

-- DROP INDEX IF EXISTS public."IX_Message_NormalizedDestinationIdentifier_Sender";

CREATE INDEX IF NOT EXISTS "IX_Message_NormalizedDestinationIdentifier_Sender"
    ON public."Message" USING btree
    ("NormalizedDestinationIdentifier" COLLATE pg_catalog."default" ASC NULLS LAST, "Sender" COLLATE pg_catalog."default" ASC NULLS LAST)
    TABLESPACE pg_default;

-- Index: IX_Message_RequestRef

-- DROP INDEX IF EXISTS public."IX_Message_RequestRef";

CREATE INDEX IF NOT EXISTS "IX_Message_RequestRef"
    ON public."Message" USING btree
    ("RequestRef" ASC NULLS LAST)
    TABLESPACE pg_default;
Besides DataDog integration, I am measuring the replication delay for inserts using the following two Message table columns in the replica:
"ReplicatedTimestamp" timestamp with time zone NOT NULL DEFAULT (now() AT TIME ZONE 'UTC'::text),
    "InsertReplicationDelay" bigint NOT NULL GENERATED ALWAYS AS (((EXTRACT(epoch FROM ("ReplicatedTimestamp" - "CreateTimestamp")) * (1000)::numeric))::bigint) STORED,
For the built-in replication, at about 150 inserts/second in the Message table, 99.9% of the values in InsertReplicationDelay are less than 80ms (databases on the same database instance for now).
Asked by Alexei (1191 rep)
Oct 16, 2024, 09:52 AM
Last activity: Oct 24, 2024, 09:10 AM