PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事

PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事

逻辑复制代码框架

专栏内容postgresql使用入门基础手写数据库toadb并发编程
个人主页我的主页
管理社区开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

✅ 🔥🔥🔥重大消息🔥🔥🔥 ❤️❤️❤️❤️ 关注公众号【开源无限】可免费领取《手写数据库内核toadb》源代码一份 ❤️❤️❤️❤️

一、概述


在我们使用数据库时,往往需要感知数据库中某些数据库对象的变化,比如表中insert/update操作使数据发生了变化。

为了及时得到数据的变化,我们常常会开启一个循环和定时器,不断的查询比较,这个工作非常耗时和容易出错,还不是很准确,令人非常头疼。

在Postgresql中有两种实时的复制模式:

  • 一种对文件内容的复制,也就是文件中二进制数据直接复制,也称为物理复制;
  • 另一种是对数据库对象的复制,数据库对象比如table, database都是逻辑概念,所以也称为逻辑复制;

逻辑复制本身就是基于数据库对象的变化,当有变化时就需要产生复制事件,这一特性刚好就可以用来作为数据库对象的变化事件,这样只需要订阅这一事件即可,由数据库来检查数据库对象的变化,并通知我们,即省力就及时。

PostgreSQL中如何实现逻辑复制功能呢?我们从几个方面来逐层展开介绍,首先介绍一下逻辑复制的代码结构,再来看一下产生通知的流程,以及如何应用到备份。

本文就来分享一下逻辑复制的代码框架结构,在整体上对逻辑复制有初步的认识。

二、 创建发布与订阅


create publicationcreate subscription之后,整个逻辑复制就建立起来了,那么我们首先来看一下这两个命令处理中做了什么事情。

2.1 发布

在主库创建发布,它是一个命令处理,一般在src/backend/commands/路径下就有对应的命令处理,每个命令会对应一个或多个源代码文件,可以看到src/backend/commands/publicationcmds.c,就是发布命令的处理代码了。

在这个源文件中,可以看到创建create,删除 remove, 修改 alter等几个接口。

我们重点来看创建发布者中主要逻辑,下面摘选了代码中的部分内容来分析。

/* * Create new publication. */ ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
  • 检查是否已经存在
 puboid =GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,CStringGetDatum(stmt->pubname));if(OidIsValid(puboid))ereport(ERROR,(errcode(ERRCODE_DUPLICATE_OBJECT),errmsg("publication \"%s\" already exists", stmt->pubname)));
  • 生成新发布者

一个新的发布者,新产生一行数据,插入到系统表中,以及相关系统表的处理。

/* */ tup =heap_form_tuple(RelationGetDescr(rel), values, nulls);recordDependencyOnOwner(PublicationRelationId, puboid,GetUserId());ObjectAddressSet(myself, PublicationRelationId, puboid);
  • 数据库对象的处理

下面是对于发布者关联数据库对象的处理,数据库对象为:所有表,某个表或某个字段,或者某个schema等等

if(stmt->for_all_tables){/* Invalidate relcache so that publication info is rebuilt. */CacheInvalidateRelcacheAll();}else{/* FOR TABLES IN SCHEMA requires superuser */if(schemaidlist != NIL &&!superuser())ereport(ERROR,errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));if(relations != NIL){PublicationAddTables(puboid, rels, true,NULL);}if(schemaidlist != NIL){PublicationAddSchemas(puboid, schemaidlist, true,NULL);}}
  • 配置参数

最后对于配置参数的检查,逻辑复制时WAL_LEVEL必须为logical

/* */if(wal_level != WAL_LEVEL_LOGICAL)ereport(WARNING,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),errmsg("wal_level is insufficient to publish logical changes"),errhint("Set wal_level to \"logical\" before creating subscriptions.")));

如果在搭建逻辑复制时,没有修改配置参数,这里就会报错,创建发布失败。

好了,至此发布源就创建好了,就等待订阅任务来触发了。

2.2 订阅

通过在备库创建订阅,来启动对数据库对象进行逻辑复制。

创建订阅主要会进行如下几步:

  • 系统表中创建订阅记录

同样create subscription命令的执行对应的源代码在src/backend/commands/subscriptioncmds.c中,下面我们摘取重点代码进行分享。

创建订阅的函数如下,同样还有删除和修改的函数。

/* * Create new subscription. */ ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel);

首先是命令检查,权限检查,同时还会在系统表中检查是否已经存在。

这里使用了动态库libpqwalreceiver来处理连接和WAL接收,它是一个公共组件,在这里会用,同样在流复制的工具中也会用到。

load_file("libpqwalreceiver", false);walrcv_check_conninfo(conninfo, opts.passwordrequired &&!superuser());

在系统表pg_subscription中插入一条新的数据,同时处理与之依赖的其它系统表的数据。

tup =heap_form_tuple(RelationGetDescr(rel), values, nulls);/* Insert tuple into catalog. */CatalogTupleInsert(rel, tup);
  • 与发布者校验信息

通过动态库,创建与发布者的连接。

 wrconn =walrcv_connect(conninfo, true, true, must_use_password, stmt->subname,&err);

从发布者获取订阅的表的信息,还有数据库的版本信息等,进行检查。

  • 启动 worker

当上面订阅相关工作处理完成后,此时会注册一个在事务提交时执行的任务,这个任务就是通知logical replication launcher进程。

if(opts.enabled)ApplyLauncherWakeupAtCommit();

此后create subscription命令主体就结束了,最后事务提交后,在logical replication launcher进程中会收到信号通知。

当收到信号之后,会检查当前集簇中的订阅列表,如果订阅是启用状态,给每个订阅对应的启动一个logical replication apply worker进程,

用于从主库接收逻辑复制数据,并应用于备库。

这里查询备库对应的后台进程,可以看到多了一个postgres: logical replication apply worker for subscription 16397进程。

[senllang@hatch bin]$ ps -ef|grep postgres |grep2477570 senllang 247757010 Nov19 ? 00:00:00 /opt/postgres/bin/postgres -p 5433 senllang 247757124775700 Nov19 ? 00:00:00 postgres: checkpointer senllang 247757224775700 Nov19 ? 00:00:00 postgres: background writer senllang 247757424775700 Nov19 ? 00:00:00 postgres: walwriter senllang 247757524775700 Nov19 ? 00:00:00 postgres: autovacuum launcher senllang 247757624775700 Nov19 ? 00:00:00 postgres: logical replication launcher senllang 251568124775700 08:26 ? 00:00:00 postgres: logical replication apply worker for subscription 16397
  • 建立主备间的逻辑复制通信

当备库的apply worker进程进行工作之后,会主动连接主库,主库也会创建一个walsender进程,来专门处理此订阅的WAL数据发送和逻辑复制的流程控制。

[senllang@hatch bin]$ ps -ef|grep postgres |grep2477547 senllang 247754710 Nov19 ? 00:00:00 /opt/postgres/bin/postgres -D pgA senllang 247754824775470 Nov19 ? 00:00:00 postgres: checkpointer senllang 247754924775470 Nov19 ? 00:00:00 postgres: background writer senllang 247755124775470 Nov19 ? 00:00:00 postgres: walwriter senllang 247755224775470 Nov19 ? 00:00:00 postgres: autovacuum launcher senllang 247755324775470 Nov19 ? 00:00:00 postgres: logical replication launcher senllang 251501924775470 08:21 ? 00:00:00 postgres: senllang postgres ::1(48026) idle senllang 251568224775470 08:26 ? 00:00:00 postgres: walsender senllang postgres ::1(57106) START_REPLICATION 

查询主数据库的后台服务进程,可以看到也会多出来一个postgres: walsender senllang postgres ::1(57106) START_REPLICATION进程,

START_REPLICATION是逻辑复制的状态信息。

至此,整个逻辑复制从主库到备库的通信链路就建立起来了,当主库有事务提交时,就会有WAL日志的落盘,此时会触发walsender来检查,当有符合当前订阅的WAL数据时,就会发送到订阅端,订阅端会进行应用到当前备库中。

五、总结


本文主要分享了逻辑复制搭建时的内核框架代码逻辑,在主库创建发布者,在备库创建订阅时,会主动与发布者建立连接,此时发布者才会准备WAL进行发布,整个流程概览就是这样,内部细节步骤还有很多,后面会分多篇进行分类介绍。

结尾


非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

作者邮箱:[email protected]
如有错误或者疏漏欢迎指出,互相学习。

注:未经同意,不得转载!

Read more

Flutter 组件 spry 适配鸿蒙 HarmonyOS 实战:轻量化 Web 框架,构建高性能端侧微服务与 Middleware 治理架构

Flutter 组件 spry 适配鸿蒙 HarmonyOS 实战:轻量化 Web 框架,构建高性能端侧微服务与 Middleware 治理架构

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 spry 适配鸿蒙 HarmonyOS 实战:轻量化 Web 框架,构建高性能端侧微服务与 Middleware 治理架构 前言 在鸿蒙(OpenHarmony)生态迈向全场景分布式协同、涉及设备端侧 API 暴露、轻量化资源服务镜像及严苛的跨端 RPC 通信背景下,如何实现一套既能保持极低内存足迹(Footprint)、又能提供类似后端(Node.js/Koa)般丝滑开发体验且具备全异步处理能力的“端侧 Web 基座”,已成为决定应用分布式自治能力与全栈同构效率的关键。在鸿蒙设备这类强调 AOT 极致效能与背景任务严格限制的环境下,如果应用依然采用重量级的 HTTP 服务端,由于由于进程级的上下文切换开销,极易由于由于“算力溢出”导致鸿蒙应用在作为服务端响应时发生明显的电量损耗。 我们需要一种能够解耦路由逻辑、支持

By Ne0inhk

10、Vue3中Vuex从入门到实战:手写迷你Vuex,掌握前端状态管理核心

Vue3中Vuex从入门到实战:手写迷你Vuex,掌握前端状态管理核心 在Vue3项目开发中,组件化让代码复用和维护更高效,但跨组件、跨页面的数据共享却成了高频痛点——用户登录信息、全局权限、公共计数器等数据,如果靠组件传参层层传递,代码会变得混乱不堪。这时候,Vuex就成了前端状态管理的“大管家”,帮我们集中式管理共享数据。本文将从前端数据管理的痛点出发,带你吃透Vuex的核心用法,甚至手写一个迷你Vuex理解其底层原理。 一、前端数据管理:为什么需要Vuex? 现代Web应用由组件、数据、路由三大核心构成,组件内部的私有数据用ref/reactive管理即可,但共享数据的管理却需要更规范的方式。 我们先试想一个简单场景:用全局变量存储共享数据。 window._store ={}// 全局存储数据 这种方式看似简单,但存在致命问题:window._store不是响应式的,修改数据后Vue组件无法自动更新视图。如果我们用Vue的响应式API包裹全局数据,并提供统一的修改方法,这就是Vuex的雏形——本质是“响应式的全局数据 + 规范化的修改规则”。 二、Vuex是什

By Ne0inhk
【递归,搜索与回溯算法 & 记忆化搜索】深入理解记忆化搜索算法:记忆化搜索算法小专题

【递归,搜索与回溯算法 & 记忆化搜索】深入理解记忆化搜索算法:记忆化搜索算法小专题

前言:实现记忆化搜索的一般步骤      (1) 实现记忆化搜索代码步骤         (2) 如何将暴搜代码转换成记忆化搜索代码?         (3)如何添加一个备忘录?         斐波那契数     题目解析         算法原理         解法一:递归        时间复杂度高是因为递归展开树有很多次重复计算,我们可以优化这些重复的计算;我们可以创建一个备忘录,当计算其中一个分支时,把计算出的 d(i) 放入一个"备忘录"中 ( i = 1 ....... n ),当递归其他分支时,我们通过备忘录存储好的计算结果,减少递归树额外重复的展开;     解法二:记忆化搜索    当我们在递归的时候,发现递归过程会重复进行完全相同的问题,我们就把这些完全相同的问题存储到额外创建的"备忘录"中,再后续递归出现相同问题,直接从备忘录中拿计算好的结果即可,避免不必要的重复递归;  所以记忆化搜索,就是一个带备忘录的递归;记忆化搜索,其实也是剪枝的一种方式,在本题使用记忆化搜索,就能把指数级别的时间复杂度降到常数

By Ne0inhk
【踩坑记录】使用 Layui 框架时解决 Unity WebGL 渲染在 Tab 切换时黑屏问题

【踩坑记录】使用 Layui 框架时解决 Unity WebGL 渲染在 Tab 切换时黑屏问题

【踩坑记录】使用 Layui 框架时解决 Unity WebGL 渲染在 Tab 切换时黑屏问题 在开发 Web 应用时,尤其是集成了 Unity WebGL 内容的页面,遇到一个问题:当 Unity WebGL 渲染内容嵌入到一个 Tab 中时,切换 Tab 后画面会变黑,直到用户点击黑屏区域,才会恢复显示。 这个问题通常是因为 Unity 渲染在 Tab 切换时被暂停或未能获得焦点所致。 在本文中,我们将介绍如何在使用 Layui 框架时,通过监听 Tab 切换事件并强制 Unity WebGL 渲染恢复,来解决这一问题。 1. 问题描述 当 Unity WebGL 内容嵌入到页面中的多个

By Ne0inhk