PostgreSQL同步复制实现逻辑分析

源码版本:pg 14.3
源文件:src/backend/replication/syncrep.c
原文地址:https://www.mytecdb.com/blogDetail.php?id=239

1、PG同步复制简介

同步复制是 pg 9.1 版本引入的新特性,事务提交必须等待事务对应的 lsn 在同步的备库接收到之后,才能提交成功。同步复制的实现逻辑主要集中在主节点,主节点记录了哪些备节点是同步节点,需要等待事务 lsn 在这些同步的备节点上接收、刷盘甚至应用完成。

pg 同步复制的几种模式:

#define SYNC_REP_NO_WAIT (-1) // 异步复制,不需等待备节点
#define SYNC_REP_WAIT_WRITE 0 // 同步复制,等待备节点写wal日志
#define SYNC_REP_WAIT_FLUSH 1 // 同步复制,等待备节点写wal日志并刷盘
#define SYNC_REP_WAIT_APPLY 2 // 同步复制,等待备节点写wal日志并应用

commit 类型的 wal record 提供 apply 反馈,其他类型只提供 write/flush 反馈。pg 的同步复制只在 commit 阶段才会等待,因为只有 commit 类型才会影响主备事务的一致性。

2、函数调用关系

主节点 backend 进程提交事务,其函数调用关系如下:
CommitTransaction()
	RecordTransactionCommit()
		SyncRepWaitForLSN(),等待某个 lsn 在备库操作完成。
			WaitLatch()


从 SyncRepWaitForLSN() 函数的调用看,pg 的同步复制,只在 commit 阶段才会等待备库确认,其他非 commit 类型的 wal 同步,不需要等待备库确认。在 SyncRepWaitForLSN() 函数内调用 SyncRepQueueInsert() 把当前 backend 进程放入 WalSndCtl->SyncRepQueue 等待队列,休眠等待备节点接收到 wal 日志、刷盘或者应用。

wal sender 进程负责将 wal 日志发送给备节点,根据备节点返回的消息,来唤醒相关的 backend 等待进程,其函数调用关系如下:
ProcessRepliesIfAny()	// walsender.c,处理备节点发送的 reply 信息。
	ProcessStandbyMessage()
		ProcessStandbyReplyMessage()
			SyncRepReleaseWaiters()
				SyncRepWakeQueue(),唤醒等待的 backend 进程
					SetLatch()


SyncRepWakeQueue() 函数中会判断  WalSndCtl->lsn 值是否小于等待队列 WalSndCtl->SyncRepQueue 里面的 backend 进程 proc->waitLSN 值,如果小于,则该 backend 进程需要继续等待,否则把该 backend 进程从等待队列里面删除。

proc->waitLSN 在事务提交时获取。WalSndCtl->lsn 在 walsender 进程接收到备节点的回复消息时获取,实际上就是 WalSndCtl->walsnds 变量保存的 lsn 信息,该变量是通过读取备库发送的网络回复消息获取到的 lsn,其实现逻辑位于 ProcessStandbyReplyMessage() 函数,相关代码如下:

static void
ProcessStandbyReplyMessage(void)
{
    /* the caller already consumed the msgtype byte */
    writePtr = pq_getmsgint64(&reply_message);
    flushPtr = pq_getmsgint64(&reply_message);
    applyPtr = pq_getmsgint64(&reply_message);
    replyTime = pq_getmsgint64(&reply_message);
    replyRequested = pq_getmsgbyte(&reply_message);
    

        WalSnd     *walsnd = MyWalSnd;
        SpinLockAcquire(&walsnd->mutex);
        walsnd->write = writePtr;
        walsnd->flush = flushPtr;
        walsnd->apply = applyPtr;
        SpinLockRelease(&walsnd->mutex);
}

3、相关的全局变量

WalSndCtlData *WalSndCtl = NULL;
WalSndCtl 位于共享内存,用于记录主备流复制进程相关的状态信息,包括等待某个 lsn 的 backend 进程队列信息以及 walsender 进程的信息等,它是一个全局共享的变量。

SyncRepConfigData *SyncRepConfig = NULL;
SyncRepConfig 记录 GUC 参数 synchronous_standby_names 配置的相关信息,其结构如下:
typedef struct SyncRepConfigData
{
    int         config_size;    /* total size of this struct, in bytes */
    int         num_sync;       /* number of sync standbys that we need to
                                 * wait for */
    uint8       syncrep_method; /* method to choose sync standbys */
    int         nmembers;       /* number of members in the following list */
    /* member_names contains nmembers consecutive nul-terminated C strings */
    char        member_names[FLEXIBLE_ARRAY_MEMBER];
} SyncRepConfigData;
GUC 参数 synchronous_standby_names 配置的值会被转成 SyncRepConfig 变量,在 pg 内核中有一个专门的语法文件 syncrep_gram.y 用于解析该参数,该参数的配置示例如下:
synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
synchronous_standby_names = 'ANY 2 (s1, s2, s3)'

文章评论

0条评论