PostgreSQL 并行表扫描分析 Gather Parallel Seq Scan

PostgreSQL 在很多场景下会启用并行执行计划,创建多个并行工作子进程,提升查询效率。


一个常用的并行表扫描的例子:

从下面的执行计划可以看出,Parallel Seq Scan 并行表扫描,并发工作进程数为 2,最上层的执行计划节点名称为 Gather。
postgres=# explain select * from tt where product_item_id=12836242;
                              QUERY PLAN
----------------------------------------------------------------------
 Gather  (cost=1000.00..46068.10 rows=1 width=41)
   Workers Planned: 2
   ->  Parallel Seq Scan on tt  (cost=0.00..45068.00 rows=1 width=41)
         Filter: (product_item_id = 12836242)

并行表扫描相关的参数:

执行计划是否采用并行执行,受很多因素影响,其中一些 guc 参数影响较大,如下:
  • max_parallel_workers_per_gather,一个 gather 节点能够使用的最大并行数量
  • min_parallel_table_scan_size,表扫描数据量的最小阈值,超过该值才会考虑使用并行
  • min_parallel_index_scan_size,索引扫描数据量的最小阈值,超过该值才会考虑使用并行
  • force_parallel_mode,强制使用并行模式
  • parallel_leader_participation,并行的 leader 是否参与并行,默认为 on

生成并行执行计划的相关函数:

以 Parallel Seq Scan 为例,介绍生成并行表扫描计划所涉及的一些重要函数。
(1)compute_parallel_worker()
根据表信息、表的 heap pages,index pages 数量,以及一些 guc 参数,计算并行的 worker 数量。
(2)set_rel_consider_parallel()
用来判断一个表是否可以并行扫描,即 rel->consider_parallel 是 true 或者 false ,由该函数判定。
(3)create_plain_partial_paths()
创建并行的 T_SeqScan 类型的 Path
(4)create_gather_path()
创建 T_Gather 类型的 Path(GatherPath)
(5)create_gather_plan()
创建 T_Gather 类型的 Plan(Gather),通过函数调用 ExecInitNode() -->ExecInitGather() 初始化 T_Gather 类型的执行计划,生成 T_GatherState。Gather 最终的执行函数为:ExecGather()

支持并行执行计划的扫描方法:

见函数 ExecParallelInitializeDSM()
  • T_SeqScanState
  • T_IndexScanState
  • T_IndexOnlyScanState
  • T_ForeignScanState
  • T_AppendState
  • T_CustomScanState
  • T_BitmapHeapScanState
  • T_HashJoinState
  • T_HashState
  • T_SortState

多个子进程如何并发的扫描 tuple?

pg 表数据存储在共享内存中,并行执行所需要的信息也是通过共享内存在所有 parallel 进程中共享,具体函数调用栈如下:
ExecGather()
  ExecInitParallelPlan()
    ExecParallelInitializeDSM()
      ExecSeqScanInitializeDSM()
        heap_parallelscan_initialize()
多个 parallel 进程顺序扫描表,ParallelHeapScanDesc target 是在共享内存中,并且与多个并发子进程共享,控制并发扫描的 page。多个进程并发调用 heap_parallelscan_nextpage() 函数,通过 pg_atomic_fetch_add_u64() 函数,多个进程原子化获取扫描的页,多进程并发扫描的粒度是页(page)级别。
nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
    if (nallocated >= scan->rs_nblocks)
        page = InvalidBlockNumber;  /* all blocks have been allocated */
    else
        page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;

多个 parallel 进程如何向 backend 进程发送 tuple 扫描结果?

parallel 进程调用 shm_mq_send(),将 tuple 数据发送给主进程:
ExecParallelGetReceiver()
  CreateTupleQueueDestReceiver()
    tqueueReceiveSlot()
      shm_mq_send()


backend 进程通过 shm_mq_receive() 接收 tuple 数据:

ExecGather()
  gather_getnext()
    gather_readnext()
      TupleQueueReaderNext()
        shm_mq_receive()


消息队列如何与多个 parallel 子进程关联?

parallel 进程与 backend 进程通过消息队列传送 tuple 数据,那么多个 parallel 进程就会有多个消息队列存在,那么每个消息队列如何与相应的 parallel 进程关联呢?在创建 parallel 进程时,会将该进程的编号放入 worker.bgw_extra 中,该变量最终被拷贝到共享内存 BackgroundWorkerData 中,代码如下:
  for (i = 0; i < pcxt->nworkers; ++i)
    {
        memcpy(worker.bgw_extra, &i, sizeof(int));
        if (!any_registrations_failed &&
            RegisterDynamicBackgroundWorker(&worker,
                                            &pcxt->worker[i].bgwhandle))
        ...
    }
在 parallel 进程中,拿到进程的编号,存放在 ParallelWorkerNumber 变量中,如下:
void
ParallelWorkerMain(Datum main_arg)
{
    memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
}
有了进程编号 ParallelWorkerNumber,就可以拿到进程对应的消息队列,如下:
static DestReceiver *
ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
{
    char       *mqspace;
    shm_mq     *mq;

    mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    mq = (shm_mq *) mqspace;
    shm_mq_set_sender(mq, MyProc);
    return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
}

backend 进程是否参与 tuple 扫描?

在并发扫描执行计划中,会创建多个 parallel 进程进行表数据扫描,那么 backend 进程是否也参与到 tuple 扫描呢?实际上会通过 gatherstate->need_to_scan_locally 来标记是否在 backend 进程本地进行表扫描。
函数调用栈如下:
ExecGather()
  gather_getnext()


相关代码如下:

static TupleTableSlot *
gather_getnext(GatherState *gatherstate)
{
    ...
        if (gatherstate->need_to_scan_locally)
        {
            EState     *estate = gatherstate->ps.state;

            /* Install our DSA area while executing the plan. */
            estate->es_query_dsa =
                gatherstate->pei ? gatherstate->pei->area : NULL;
            outerTupleSlot = ExecProcNode(outerPlan);
            estate->es_query_dsa = NULL;

            if (!TupIsNull(outerTupleSlot))
                return outerTupleSlot;

            gatherstate->need_to_scan_locally = false;
        }
    ...
}
need_to_scan_locally 的取值与几个因素有关:
(1)如果所有 parallel 进程启动失败,那么 need_to_scan_locally 必须为 true,只能本地 backend 进程扫描 tuple 了。
(2)guc 参数 parallel_leader_participation 为 true 并且 single_copy 为 false。
以上两个条件满足一个,即可启动 backend 本地扫描 tuple,具体代码片断如下:
node->need_to_scan_locally = (node->nreaders == 0)
            || (!gather->single_copy && parallel_leader_participation);
关于 single_copy 的含义如下:
typedef struct GatherPath
{
    Path        path;
    Path       *subpath;        /* path for each worker */
    bool        single_copy;    /* don't execute path more than once */
    int         num_workers;    /* number of workers sought to help */
} GatherPath;
single_copy 主要由并发数决定,即 path 路径是否支持并发,如果 parallel_workers = 0,则 single_copy = true。
pathnode->subpath = subpath;
    pathnode->num_workers = subpath->parallel_workers;
    pathnode->single_copy = false;

    if (pathnode->num_workers == 0)
    {
        pathnode->path.pathkeys = subpath->pathkeys;
        pathnode->num_workers = 1;
        pathnode->single_copy = true;
    }


文章评论

0条评论