PostgreSQL 并行表扫描分析 Gather Parallel Seq Scan
PostgreSQL 在很多场景下会启用并行执行计划,创建多个并行工作子进程,提升查询效率。
一个常用的并行表扫描的例子:
从下面的执行计划可以看出,Parallel Seq Scan 并行表扫描,并发工作进程数为 2,最上层的执行计划节点名称为 Gather。
postgres=# explain select * from tt where product_item_id=12836242; QUERY PLAN ---------------------------------------------------------------------- Gather (cost=1000.00..46068.10 rows=1 width=41) Workers Planned: 2 -> Parallel Seq Scan on tt (cost=0.00..45068.00 rows=1 width=41) Filter: (product_item_id = 12836242)
并行表扫描相关的参数:
执行计划是否采用并行执行,受很多因素影响,其中一些 guc 参数影响较大,如下:
- max_parallel_workers_per_gather,一个 gather 节点能够使用的最大并行数量
- min_parallel_table_scan_size,表扫描数据量的最小阈值,超过该值才会考虑使用并行
- min_parallel_index_scan_size,索引扫描数据量的最小阈值,超过该值才会考虑使用并行
- force_parallel_mode,强制使用并行模式
- parallel_leader_participation,并行的 leader 是否参与并行,默认为 on
生成并行执行计划的相关函数:
以 Parallel Seq Scan 为例,介绍生成并行表扫描计划所涉及的一些重要函数。
(1)compute_parallel_worker()
根据表信息、表的 heap pages,index pages 数量,以及一些 guc 参数,计算并行的 worker 数量。
(2)set_rel_consider_parallel()
用来判断一个表是否可以并行扫描,即 rel->consider_parallel 是 true 或者 false ,由该函数判定。
(3)create_plain_partial_paths()
创建并行的 T_SeqScan 类型的 Path
(4)create_gather_path()
创建 T_Gather 类型的 Path(GatherPath)
(5)create_gather_plan()
创建 T_Gather 类型的 Plan(Gather),通过函数调用 ExecInitNode() -->ExecInitGather() 初始化 T_Gather 类型的执行计划,生成 T_GatherState。Gather 最终的执行函数为:ExecGather()
支持并行执行计划的扫描方法:
见函数 ExecParallelInitializeDSM()
- T_SeqScanState
- T_IndexScanState
- T_IndexOnlyScanState
- T_ForeignScanState
- T_AppendState
- T_CustomScanState
- T_BitmapHeapScanState
- T_HashJoinState
- T_HashState
- T_SortState
多个子进程如何并发的扫描 tuple?
pg 表数据存储在共享内存中,并行执行所需要的信息也是通过共享内存在所有 parallel 进程中共享,具体函数调用栈如下:
ExecGather()
ExecInitParallelPlan()
ExecParallelInitializeDSM()
ExecSeqScanInitializeDSM()
heap_parallelscan_initialize()
多个 parallel 进程顺序扫描表,ParallelHeapScanDesc target 是在共享内存中,并且与多个并发子进程共享,控制并发扫描的 page。多个进程并发调用 heap_parallelscan_nextpage() 函数,通过 pg_atomic_fetch_add_u64() 函数,多个进程原子化获取扫描的页,多进程并发扫描的粒度是页(page)级别。
nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1); if (nallocated >= scan->rs_nblocks) page = InvalidBlockNumber; /* all blocks have been allocated */ else page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
多个 parallel 进程如何向 backend 进程发送 tuple 扫描结果?
parallel 进程调用 shm_mq_send(),将 tuple 数据发送给主进程:
ExecParallelGetReceiver()
CreateTupleQueueDestReceiver()
tqueueReceiveSlot()
shm_mq_send()
backend 进程通过 shm_mq_receive() 接收 tuple 数据:
ExecGather()
gather_getnext()
gather_readnext()
TupleQueueReaderNext()
shm_mq_receive()
消息队列如何与多个 parallel 子进程关联?
parallel 进程与 backend 进程通过消息队列传送 tuple 数据,那么多个 parallel 进程就会有多个消息队列存在,那么每个消息队列如何与相应的 parallel 进程关联呢?在创建 parallel 进程时,会将该进程的编号放入 worker.bgw_extra 中,该变量最终被拷贝到共享内存 BackgroundWorkerData 中,代码如下:
for (i = 0; i < pcxt->nworkers; ++i) { memcpy(worker.bgw_extra, &i, sizeof(int)); if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) ... }
在 parallel 进程中,拿到进程的编号,存放在 ParallelWorkerNumber 变量中,如下:
void ParallelWorkerMain(Datum main_arg) { memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int)); }
有了进程编号 ParallelWorkerNumber,就可以拿到进程对应的消息队列,如下:
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) { char *mqspace; shm_mq *mq; mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false); mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; mq = (shm_mq *) mqspace; shm_mq_set_sender(mq, MyProc); return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL)); }
backend 进程是否参与 tuple 扫描?
在并发扫描执行计划中,会创建多个 parallel 进程进行表数据扫描,那么 backend 进程是否也参与到 tuple 扫描呢?实际上会通过 gatherstate->need_to_scan_locally 来标记是否在 backend 进程本地进行表扫描。
函数调用栈如下:
ExecGather()
gather_getnext()
相关代码如下:
static TupleTableSlot * gather_getnext(GatherState *gatherstate) { ... if (gatherstate->need_to_scan_locally) { EState *estate = gatherstate->ps.state; /* Install our DSA area while executing the plan. */ estate->es_query_dsa = gatherstate->pei ? gatherstate->pei->area : NULL; outerTupleSlot = ExecProcNode(outerPlan); estate->es_query_dsa = NULL; if (!TupIsNull(outerTupleSlot)) return outerTupleSlot; gatherstate->need_to_scan_locally = false; } ... }
need_to_scan_locally 的取值与几个因素有关:
(1)如果所有 parallel 进程启动失败,那么 need_to_scan_locally 必须为 true,只能本地 backend 进程扫描 tuple 了。
(2)guc 参数 parallel_leader_participation 为 true 并且 single_copy 为 false。
以上两个条件满足一个,即可启动 backend 本地扫描 tuple,具体代码片断如下:
node->need_to_scan_locally = (node->nreaders == 0) || (!gather->single_copy && parallel_leader_participation);
关于 single_copy 的含义如下:
typedef struct GatherPath { Path path; Path *subpath; /* path for each worker */ bool single_copy; /* don't execute path more than once */ int num_workers; /* number of workers sought to help */ } GatherPath;
single_copy 主要由并发数决定,即 path 路径是否支持并发,如果 parallel_workers = 0,则 single_copy = true。
pathnode->subpath = subpath; pathnode->num_workers = subpath->parallel_workers; pathnode->single_copy = false; if (pathnode->num_workers == 0) { pathnode->path.pathkeys = subpath->pathkeys; pathnode->num_workers = 1; pathnode->single_copy = true; }
文章评论