Reviers, le 26 février 2024

Depuis sa version 10, PostgreSQL propose de la réplication logique. Cette réplication se base sur un module de décodage logique appelé pgoutput mais ce n’est pas le seul module de décodage logique. Il en existe plus d’une dizaine actuellement, et il est possible de coder le sien. Dans cet article, nous allons faire un retour rapide sur la réplication logique, le décodage logique pour aborder à la fin la création d’un module de décodage logique et le client qui utilisera ce module.

Guillaume Lelarge

Réplication logique

La réplication existe dans PostgreSQL depuis la version 9.0 (donc depuis 2010, ce qui commence à être vieux). Cette réplication est excellente pour avoir un PRA mais elle souffre de limitations intrinsèques difficiles à corriger.

L’une des plus importantes est que le système de réplication ne sait pas ce qu’il réplique. Il sait qu’il doit écrire tel et tel octet à tel emplacement dans tel fichier, mais il ne sait pas à quelle opération cela correspond :

  • est-ce une écriture dans une table ? un index ? une vue matérialisée ?
  • est-ce le résultat d’une commande DDL ? ou d’un INSERT ? d’un DELETE ?
  • quelle base cela concerne-t-il ?

La conséquence directe est que cette réplication a une granularité inexistante : c’est toute l’instance ou rien.

Les données qui voyagent entre le primaire et le secondaire sont en binaire, et donc leur codage dépend fortement du serveur primaire. En conséquence, les serveurs secondaires doivent tous avoir la même version majeure, les mêmes options de compilation, être installés sur la même architecture matérielle (par exemple tous sur x86_64, ou tous sur ARMv8) et la même architecture logicielle (notamment la même librairie C).

Enfin, autre point limitant et c’est un peu une lapalissade, le secondaire est en lecture seule donc nous ne pouvons rien écrire dessus. Pas d’index supplémentaire pour l’utilisation spécifique du secondaire, même pas de tables temporaires, etc.

La réplication logique répond à ces besoins. Les données envoyées ne sont pas les requêtes mais le résultat des requêtes, dans un format texte décodable partout. Des informations supplémentaires sont envoyées comme le nom des objets SQL et celui des opérations SQL concernées.

Les deux serveurs sont disponibles en lecture/écriture. Ce sont deux serveurs autonomes. Ils peuvent être de versions différentes, installées sur des systèmes différents. Un serveur va proposer une « publication » des données, dans laquelle des tables seront sélectionnées, voire juste certaines lignes ou colonnes. Le serveur qui veut recevoir ces données va s’abonner à la publication.

Voici un petit exemple de mise en place sur deux instances, s1 (port 5432) et s2 (port 5433). Ils sont sur la même machine, et ils sont tous les deux en version

  1. Voici la configuration spécifique de s1 :
port = 5432
wal_level = logical

Et voici la configuration spécifique de s2 :

port = 5433

Sur s1, nous allons créer une base, y ajouter une table et la remplir. Voici le script SQL pour le faire à partir de psql :

CREATE DATABASE b1;
\c b1
CREATE TABLE t1 (c1 integer, c2 text);
ALTER TABLE t1 ADD PRIMARY KEY (c1);
INSERT INTO t1 SELECT i, 'Ligne '||i FROM generate_series(1, 1000) i;

Nous pouvons maintenant créer une publication sur la base b1. Nous allons le faire ici pour toutes les tables mais nous aurions pu n’en sélectionner que quelques-unes. La création de la publication se fait ainsi :

CREATE PUBLICATION pub_s1 FOR ALL TABLES;

Passons maintenant au serveur s2. Pour s’abonner, il faut déjà disposer des objets SQL. Nous allons créer une base b1, et sa table t1 mais sans les données. Le plus simple est d’utiliser la commande ci-dessous :

pg_dump --port 5432 --schema-only b1 | psql --port 5433 b1

Nous pouvons maintenant créer la souscription à l’abonnement pub_s1. Cela se fait ainsi :

CREATE SUBSCRIPTION sub_pub_s1
  CONNECTION 'port=5432 dbname=b1'
  PUBLICATION pub_s1;

À partir de là, une synchronisation initiale des données est effectuée pour peupler la table. Toute modification survenant sur cette table du serveur s1 sera automatiquement reproduite sur la table t1 du serveur s2. Le serveur s2 est disponible en écriture : nous pouvons y ajouter des objets et des données.

Les données de réplication passent toujours par les journaux de transactions disponibles sur le primaire. Un slot de réplication créé automatiquement permet de conserver les journaux jusqu’à consommation par le serveur s2. Voici ce qu’indique la vue pg_replication_slots sur le serveur s1 :

b1=# SELECT * FROM pg_replication_slots \gx

-[ RECORD 1 ]-------+-----------
slot_name           | sub_pub_s1
plugin              | pgoutput
slot_type           | logical
datoid              | 16388
database            | b1
temporary           | f
active              | t
active_pid          | 95464
xmin                |
catalog_xmin        | 746
restart_lsn         | 0/1C70DB8
confirmed_flush_lsn | 0/1C72E70
wal_status          | reserved
safe_wal_size       |
two_phase           | f
conflicting         | f

Nous avons donc bien un slot de réplication logique pour l’abonnement du serveur s2 à la publication pub_s1. Ce slot est actif et permanent (non temporaire). Il utilise le plugin pgoutput. Reste à savoir ce qu’est un plugin…

Plugin ou module de décodage logique

Un plugin (en VO) ou module de décodage logique (en VF) est une librairie C qui va récupérer les enregistrements des journaux de transactions, les décoder et traiter les informations utiles pour lui.

Plusieurs modules existent déjà. Le module pgoutput est celui conçu et utilisé pour la réplication logique native de PostgreSQL. Le module test_decoding est fourni avec le code de PostgreSQL comme exemple de module pour les développeurs. Et enfin, il en existe une dizaine listés sur la page « Logical Decoding Plugins » du wiki PostgreSQL, pour transformer les informations des journaux en un format cible : JSON, ordres Mongo, et même SQL.

Il est possible d’utiliser un module logique via l’interface SQL grâce à quelques fonctions :

  • pg_create_logical_replication_slot() pour créer un slot de réplication logique ;
  • pg_logical_slot_get_changes() et pg_logical_slot_peek_changes() pour récupérer les changements à partir de ce slot ;
  • pg_drop_replication_slot() pour supprimer le slot.

Voici une petite démonstration avec le module test_decoding qui fait partie des modules contrib de la distribution PostgreSQL :

-- Ces ordres peuvent être exécutés dans des sessions différentes sur la base b1

-- Création du slot de réplication logique

b1=# SELECT pg_create_logical_replication_slot('testslot', 'test_decoding');

 pg_create_logical_replication_slot
------------------------------------
 (testslot,0/1C73000)
(1 row)

-- Récupération des changements
-- (aucun car aucun INSERT/UPDATE/DELETE/TRUNCATE entre temps)

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

 lsn | xid | data
-----+-----+------
(0 rows)

-- Insertion de deux lignes

b1=# INSERT INTO t1 VALUES (-1, 'Ligne -1'), (-2, 'Ligne -2');
INSERT 0 2

-- Récupération des changements
-- on voit bien les 2 lignes avec les valeurs des deux colonnes

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

    lsn    | xid |                            data
-----------+-----+-------------------------------------------------------------
 0/1C73000 | 746 | BEGIN 746
 0/1C73000 | 746 | table public.t1: INSERT: c1[integer]:-1 c2[text]:'Ligne -1'
 0/1C73D78 | 746 | table public.t1: INSERT: c1[integer]:-2 c2[text]:'Ligne -2'
 0/1C73DF0 | 746 | COMMIT 746
(4 rows)

-- Insertion d'une ligne

b1=# INSERT INTO t1 VALUES (0, 'pgsession');
INSERT 0 1

-- Récupération des changements
-- on voit bien la nouvelle ligne avec les valeurs des deux colonnes

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
    lsn    | xid |                            data
-----------+-----+-------------------------------------------------------------
 0/1C73E28 | 747 | BEGIN 747
 0/1C73E28 | 747 | table public.t1: INSERT: c1[integer]:0 c2[text]:'pgsession'
 0/1C73EA0 | 747 | COMMIT 747
(3 rows)

-- Modification d'une ligne

b1=# UPDATE t1 SET c2=upper(c2) WHERE c1<0;
UPDATE 2

-- Récupération des changements
-- on voit bien 2 lignes (celles modifiées par l'ordre)
-- avec les nouvelles valeurs calculées

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

    lsn    | xid |                            data
-----------+-----+-------------------------------------------------------------
 0/1C97180 | 748 | BEGIN 748
 0/1C97180 | 748 | table public.t1: UPDATE: c1[integer]:-2 c2[text]:'LIGNE -2'
 0/1C971D0 | 748 | table public.t1: UPDATE: c1[integer]:-1 c2[text]:'LIGNE -1'
 0/1C97250 | 748 | COMMIT 748
(6 rows)

-- Suppression du slot

b1=# SELECT pg_drop_replication_slot('testslot');
 pg_drop_replication_slot
--------------------------

(1 row)

Le module décode bien toutes les informations logiques.

Codage de notre module

Si les modules actuellement disponibles ne satisfont pas notre besoin, il est tout à fait possible de créer son propre module. Le document sur le décodage logique intègre un chapitre sur les fonctions callback utilisables pour créer un module de décodage logique.

Cette documentation indique que quatre fonctions sont requises :

  • _PG_output_plugin_init pour initialiser les pointeurs vers les fonctions callback ;
  • begin_cb() pour traiter un début de transaction ;
  • change_cb() pour traiter un changement (donc une instruction DML en écriture) ;
  • commit_cb() pour traiter une fin de transaction.

Comme d’habitude, nous avons placé le code sur notre dépôt GitHub et mis en place des tags pour suivre plus facilement.

Le tag j5e0 correspond à notre squelette de module de décodage logique. Il contient un fichier Makefile identique aux autres modules que nous avons déjà codés et un fichier plugin_audit.c contenant le code de base de notre plugin.

En ligne 55 se trouve le code d’initialisation des pointeurs vers les fonctions callback. Cela ressemble là aussi beaucoup au code de nos autres modules.

En ligne 65, nous avons le code de la fonction callback startup_cb(). Cette fonction va allouer de la mémoire pour une structure qui contiendra notre contexte mémoire pour le jour où on en aura besoin. Et en ligne 94, la fonction callback shutdown_cb() désalloue cette mémoire.

Les fonctions callback begin_cb() et commit_cb sont requises donc elles existent, mais sont vides.

La fonction qui nous intéresse principalement pour ce module est la fonction change_cb(). Cette fonction est appelée pour chaque ligne insérée, supprimée ou modifiée. La structure LogicalDecodingContext nous permet de renvoyer un texte qui sera affiché par la fonction SQL pg_logical_slot_get_changes. Dans ce squelette, le texte est une constante mais nous allons y apporter des changements dans les exemples suivants. Son contenu est disponible en ligne 107.

La compilation se fait comme d’habitude :

$ cd journee5/audit
$ make
$ make install

Avec cette dernière commande, la bibliothèque partagée est copiée dans le répertoire lib de PostgreSQL. L’emplacement exact peut se retrouver en utilisant la commande suivante :

$ pg_config --libdir

Pour tester l’utilisation de notre module de décodage logique, nous allons reprendre l’exemple précédent :

b1=# SELECT pg_create_logical_replication_slot('testslot', 'plugin_audit');
 pg_create_logical_replication_slot
------------------------------------
 (testslot,0/1C9C640)
(1 row)

b1=# INSERT INTO t1 VALUES (-3, 'Ligne -3'), (-4, 'Ligne -4');
INSERT 0 2

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

    lsn    | xid | data
-----------+-----+-------
 0/1C9C640 | 755 | yeah!
 0/1C9C6C8 | 755 | yeah!
(2 rows)

b1=# SELECT pg_drop_replication_slot('testslot');

 pg_drop_replication_slot
--------------------------

(1 row)

Nous avons donc bien deux lignes affichées correspondant à l’action pour chaque ligne insérée, et la colonne data contient bien notre texte.

Notre module fonctionne mais il n’est pas très informatif. Essayons maintenant de récupérer le nom de la table visée par l’opération DML.

Le code se trouve sur le tag j5e1. L’identifiant de la table fait partie du troisième argument de la fonction callback change_cb. Récupérer son nom qualifié du nom du schéma demande à lire le contenu de la table pg_class au moyen de quelques fonctions.

Après compilation et tests, voici le résultat suite à une écriture dans la table t1 :

b1=# INSERT INTO t1 VALUES (-5, 'Ligne -5');
INSERT 0 1

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

    lsn    | xid |   data
-----------+-----+-----------
 0/1C9C8D8 | 756 | public.t1
(1 row)

La colonne data indique bien maintenant le nom de la table.

On pourrait aussi vouloir récupérer l’opération réalisée. Le code résultant fait partie du tag j5e2. Cette fois, nous utilisons l’argument change pour récupération l’opération effectuée. Et voici le résultat :

b1=# INSERT INTO t1 VALUES (-6, 'Ligne -6');
INSERT 0 1

b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);

    lsn    | xid |       data
-----------+-----+------------------
 0/1C9F6F0 | 757 | public.t1 INSERT
(1 row)

Plein d’autres informations sont récupérables, comme notamment les noms des colonnes et leurs nouvelles valeurs. Le code du module d’exemple test_decoding est très intéressant à ce regard.

Maintenant que nous avons un module de décodage logique capable de reconnaître les tables ciblées par des opérations DML, plutôt que de passer par psql, nous allons coder un client qui va l’utiliser.

Codage d’un client qui utilise notre module

Le tag j5e3 ajoute le code squelette d’un client ressemblant beaucoup à ce que vous avions écrit dans Hack’PG #4.

Le fichier Makefile est un peu modifié pour générer l’application cliente et le module de décodage logique. Le code du client se trouve dans le fichier audit.c. Ce code ne fait que se connecter à une base, indique le nom de la table donnée en argument de la commande, puis se déconnecte.

Le code précédent n’est qu’un squelette de client. Maintenant, il nous faut ajouter notre code spécifique pour la gestion du module de décodage logique. En effet, cet outil doit tout d’abord créer le slot de réplication. Puis, une boucle va permettre de récupérer les changements au fur et à mesure. Une fois la boucle terminée, le slot est supprimé. Le code correspondant à cette description est disponible sur le tag j5e4.

Pour tester l’application, il faut deux terminaux : un avec psql pour lancer des requêtes DML et l’autre avec l’application pour voir si elle récupère bien le nom des tables impactées. Par exemple, avec l’option -e qui permet d’afficher les requêtes exécutées par l’outil, et donc de suivre son exécution.

$ ./audit --echo -d b1 -p5432  t1
SELECT pg_catalog.set_config('search_path', '', false);
audit: Auditing table "t1"...
SELECT * FROM pg_create_logical_replication_slot('audit_235578', 'plugin_audit', false, true);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
public.t1 INSERT
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
public.t1 INSERT
public.t1 INSERT
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_drop_replication_slot('audit_235578');

Faire une boucle de 60 secondes, c’est un peu bête. Mieux vaudrait une boucle infinie qu’un Control-C arrêterait. Par contre, il faut récupérer l’interruption pour nettoyer le slot de réplication créé au début. Une solution facile et intelligente serait d’utiliser un slot temporaire, mais il est intéressant de voir la gestion d’une interruption. Le tag j5e5 montre justement comment intercepter l’interruption pour exécuter son propre code.

Cette application demande la table à tracer en paramètre, mais pour l’instant, elle trace les requêtes DML pour toutes les tables. Il reste donc à ajouter le code qui vérifie que la requête concerne bien la table ciblée. Cela correspond au code disponible sur le tag j5e6.

Un tel outil peut servir de base à un outil d’audit des changements sur une base.

Conclusion

Voici notre tour sur la réplication et le décodage logique terminé. La création de modules de décodage logique ouvre la porte à de nombreuses nouvelles applications, que ce soit pour de la réplication, de l’audit ou du CDC.

À noter que notre VIP tech de la pgsession de cette année a été Bertrand Drouvot, qui nous a justement parlé des dernières nouveautés sur la réplication logique dans PostgreSQL.


DALIBO

DALIBO est le spécialiste français de PostgreSQL®. Nous proposons du support, de la formation et du conseil depuis 2005.