PostgreSQL复制槽源码分析

  • 源码版本:PG 13.3
  • 源码文件:slot.c slotfuncs.c

1. 什么是 PG 复制槽?

PG 复制槽用于记录主备流复制的状态,主要目的是防止 wal 日志被过早的删除,导致备库流复制中断。复制槽是有状态的,能够持久化到磁盘上,允许宕机、重启场景下进行恢复。在有复制槽的场景下,即使备库关闭很长时间,主库也会为其保留足够的 wal 日志,直到备库恢复接收完这些 wal 日志,主库才会将其删除。当然这也带来了新的问题,即如果备库永远不恢复,那么主库的 wal 日志就会永远保留,导致磁盘空间耗尽,这时需要人工介入处理。

2. PG 复制槽相关参数

  • max_replication_slots,最大的复制槽数量,取值范围为 0 ~ 0x3FFFF,默认值为 10, 设置为 0 表示禁用复制槽。
  • wal_level >= replica

3. 复制槽共享内存初始化

复制槽相关的数据结构存储在共享内存中,大小由 max_replication_slots 参数决定,每个复制槽由结构体 ReplicationSlot 表示,其成员变量 in_use 表示该复制槽元素是否正在被使用。

  • ReplicationSlotsShmemSize(),计算复制槽共享内存大小
  • ReplicationSlotsShmemInit(),初始化共享内存

4. PG 复制槽的实现逻辑

4.1 创建复制槽

创建复制槽的函数调用关系:

pg_create_physical_replication_slot()
    create_physical_replication_slot()
        ReplicationSlotCreate()

ReplicationSlotCreate() 函数实现了创建复制槽的功能,主要入参是复制槽名称,函数内部调用 ReplicationSlotValidateName() 函数对复制槽名称进行合法性检查。遍历共享内存中的 ReplicationSlot 数组,找到一个 in_use 为 0 的元素,对该元素的各成员进行初始化。遍历 ReplicationSlot 数组时,也会对复制槽名称是否出现同名进行检查,如果已有相同名称的复制槽则报错。最后调用 CreateSlotOnDisk() 函数,将复制槽的数据写入数据目录下的文件中,路径为 pg_replslot/$slot_name/state,文件内容为 ReplicationSlotOnDisk 结构体。

4.2 启动复制槽

在主库创建的复制槽需要在主备流复制启动后才能使用,备库发送 start replication 时可以指定复制槽名称来启动复制槽,函数调用

PostgresMain()
    exec_replication_command()
        StartReplication()
            WalSndLoop()

5. 导出函数和视图

slotfuncs.c 源文件中定义了一些导出函数和视图给用户调用,用于操作复制槽。

函数: pg_create_physical_replication_slot() pg_create_logical_replication_slot() pg_drop_replication_slot() pg_get_replication_slots() pg_replication_slot_advance()

视图: pg_replication_slots

6. 复制槽相关问题

6.1 复制槽 dirty 表示什么?

MyReplicationSlot->just_dirtied = true; MyReplicationSlot->dirty = true;

复制槽数据在共享内存与磁盘上都会进行存储,当共享内存中的数据发生变化,则会将 MyReplicationSlot->dirty 标记为 true,刷盘后标记为 false。相关代码可参见 ReplicationSlotPersist() 函数。

6.2 为什么复制槽能够阻止 wal 日志被清除?

ReplicationSlotsComputeRequiredLSN() 函数遍历所有复制槽,获取最小的 restart_lsn,将该值赋值给 XLogCtl->replicationSlotMinLSN。wal 日志的清除主要在 checkpoint 操作时进行,在创建 checkpoint 时,会调用 KeepLogSeg() 函数,依赖 XLogCtl->replicationSlotMinLSN 保留 wal 日志文件。

此外,ReplicationSlotsComputeRequiredXmin() 函数遍历所有复制槽,获取最小的 effective_xmin 和 effective_catalog_xmin,然后分别将其赋值给如下两个变量,这两个变量在事务快照等场景下会使用。

procArray->replication_slot_xmin
procArray->replication_slot_catalog_xmin

6.3 逻辑复制槽与物理复制槽的区别

逻辑复制槽与物理复制槽在源码上的区别是 slot->data.database 是不是为 0 。

#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)

6.4 复制槽的 xid 和 lsn 如何更新?

在 wal sender 进程中接收备库发出的回馈信息,包含 feedbackXmin 和 feedbackCatalogXmin,这些信息被存储在 slot 的成员变量中,如下:

slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
slot->data.catalog_xmin = feedbackCatalogXmin;
slot->effective_catalog_xmin = feedbackCatalogXmin;

函数调用关系如下:

WalSndLoop()
    ProcessRepliesIfAny()
        ProcessStandbyMessage()
            ProcessStandbyHSFeedbackMessage()
                PhysicalReplicationSlotNewXmin()

lsn 的更新与逻辑复制有关,wal sender 进程在逻辑解码时,更新相关的 lsn,如下:

slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = current_lsn;
slot->candidate_restart_valid = current_lsn;
slot->candidate_restart_lsn = restart_lsn;

函数调用关系如下:

WalSndLoop()
    XLogSendLogical()
        LogicalDecodingProcessRecord()
            DecodeStandbyOp()
                SnapBuildProcessRunningXacts()
                    LogicalIncreaseXminForSlot()
                    LogicalIncreaseRestartDecodingForSlot()

文章评论

0条评论