Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持PostgreSQL协议的讨论帖 #1546

Open
oisin9 opened this issue May 6, 2024 · 18 comments
Open

支持PostgreSQL协议的讨论帖 #1546

oisin9 opened this issue May 6, 2024 · 18 comments
Labels

Comments

@oisin9
Copy link

oisin9 commented May 6, 2024

PostgreSQL数据库是一个流行的开源关系型数据库,计划为workflow框架增加对PostgreSQL协议的支持。后续的进展和讨论将会放到这个Issue中进行。

开发workflow的PG协议的Fork仓库地址:
https://github.com/oisin9/workflow/tree/feature-pg

@Barenboim
Copy link
Contributor

感谢Postgres社区出手相助!任何进展我们随时沟通。

从Postgres协议的描述上看(#766 ),Postgres的实现难度远低于MySQL,高于Redis。建议实现时同时考虑client和server的需求,即可以用来开发基于Postgres协议的server/proxy。

另外,可以考虑做成独立的协议插件。例如workflow里的Kafka协议,就是可以完全独立的安装。另一个协议插件例子是srpc,这个项目里实现了一组rpc协议。

@oisin9
Copy link
Author

oisin9 commented May 8, 2024

在理解ComplexTask上遇到了一些问题,于是请教了workflow的开发同学,在workflow小伙伴的支持下,基本理顺了复合任务的工作机制和流程。
把我总结的内容贴在下面,希望可以给其他使用和想给workflow贡献代码的小伙伴们一些参考,如果有问题还请及时指出。

ComplexTask的执行路径

  1. 首先调用ComplexTask::message_out()来构造一个req包,然后底层框架去调用这个req的encode方法来发送数据。
  2. 然后调用ComplexTask::message_in()来构造一个返回的resp,从而底层框架知道怎么解析返回的数据(调用哪个resp的append来接收数据)。
  3. 接收完数据后,调用ComplexTask::finish_once()来去处理
  4. 当finish_once()的返回值是false时,会重复1-3步骤
  5. 当finish_once()的返回值是true时,返回这个resp,执行用户传给ComplexTask的回调。

图片

ComplexTask中的seqid

每个ComplexTask里都有一个seqid成员变量,这个是由框架底层去维护的,每进行一次任务(调用一次message_out()函数),seqid都会+1,可以使用seqid来辅助判断当前进行到哪一步了。

@Barenboim
Copy link
Contributor

Barenboim commented May 8, 2024

差不多是这个流程。seqid就是这次请求是这个长连接上的第几次交互的意思,对于redis来讲,如果是第0次,则需要发送AUTH请求或SELECT请求(无需AUTH时),也可能是实际用户请求(既没有AUTH也没有SELECT);如果是第1次,有可能是SELECT请求或用户请求。

finish_once返回false表示此次交互不是用户请求,所以整个task会再dispatch一次,以执行实际用户请求。

但注意,第二次dispatch依然可能拿到一个新连接(seqid==0),原来AUTH过的连接是有可能被另外一个相同用户名密码的任务抢走的,这时候又会重新AUTH。

当然,第二次dispatch也可能拿到另外一个任务释放的连接,总之不一定就是刚才个连接。这也是为什么MySQL里,认证过程中server下发的seed是保存在连接上,而不是保存在任务里。如果你能理解到这一步,基本上就算很熟悉我们的模式了。

@oisin9
Copy link
Author

oisin9 commented May 8, 2024

我看到seqid是保存在CommSession,而WFComplexClientTask是最终继承自CommSession,那会不会出现seqid>0,但实际上这个连接还没有认证过。还是说框架会去自动更新seqid

@Barenboim
Copy link
Contributor

Barenboim commented May 8, 2024

我看到seqid是保存在CommSession,而WFComplexClientTask是最终继承自CommSession,那会不会出现seqid>0,但实际上这个连接还没有认证过。还是说框架会去自动更新seqid

seqid没有保存在session上啊,每次从连接上拿的。代码在这一行:

session->seq = entry->seq++;

entry是一个CommConnEntry结构,代表连接入口。session每次发起都会从那取一个seq下来。

总之一个WFComplexClientTask的多次dispatch(finish_once返回false),拿到的seqid并不是递增的。以前我们的一个版本,如果访问一个域名,域名下多个IP,一次MySQL请求就会把所有的IP都AUTH一遍。不过这个问题已经修复了。

@oisin9
Copy link
Author

oisin9 commented May 9, 2024

明白了 感谢

@oisin9
Copy link
Author

oisin9 commented May 13, 2024

在ComplexClientTask中,相同的info信息会共用连接,Mysql启用事务的方式是通过给info指定一个唯一的ID来保证这个连接不会被其他的任务共享,这个连接的生命周期是交给用户来控制(用户来控制是否disconnect)。

对于PostgreSQL协议的实现来说,想要简化这个过程

PG有下面两种协议:

  1. 简单查询协议
  2. 扩展查询协议

对于扩展查询协议,会和数据库有多次交互,直到最后发送sync,才表示一个扩展查询结束,所有的交互都必须在同一个连接里执行。所以对于扩展查询协议,在复合任务执行期间默认就应该独占连接(无论用户是否指定开启事务)。

同时可以让用户在创建任务时,显式的指定开启事务(默认不开启),如果开启了事务,就在复合任务执行期间独占一个连接。

如果说每次创建复合任务时,都指定一个唯一的info来获取连接,任务结束后释放连接,会浪费系统资源、增加延迟(每次都要重新建立连接,重新认证),能否通过动态修改info的方式来做到复用连接?

  1. 复合任务在初始化时,通过用户名密码等设置info,从而获得一个共享的连接
  2. 获取到共享连接后,通过修改info信息,通过指定一个唯一的字符串(比如uuid等),让这个连接变成一个独享连接。
  3. 这个复合任务中后续的任务都使用唯一的info字符串来获取该独享连接执行任务。
  4. 在复合任务结束时,将该连接的info字符串修改为通用的info字符串,从而将该链接重新变回一个共享连接,使其他的任务可以复用他。

@Barenboim
Copy link
Contributor

先说一下,目前你可以先不用关注复合任务交互这块,可以先搞定协议解析,再考虑怎么交互。

我们ComplexClientTask的info部分,是为了用来区别通讯目标的,同一个通讯目标下,所有连接都认为是等价的,想发起请求,可以选取通讯目标下任何一个IP的任何一个连接使用。我们在获得通讯目标时,可以要求固定地址+固定连接,这块在最新的MySQL代码里,是这个地方(最近有一些小修改):
https://github.com/sogou/workflow/blob/master/src/factory/MySQLTaskImpl.cc#L718
任务要求固定地址和固定连接的信息,不会再放到info里。当然,你要区分两个固定连接,还是需要通过info。

对于PG里的扩展查询协议,我的理解就是现在WFMySQLConnection解决的问题,这个类通过指定一个id来选取一个固定连接,这个连接上的任务都通过这个二级工厂来产生。哪怕你把工厂(就是这个WFMySQLConnection)delete了,只要连接没有关闭,之后还是可以通过相同的id找回这个连接。你的扩展查询到时候搞一个一样的就可以了。

看起来并没有你想像的那么复杂,建议先关注一下协议的实现。

@oisin9
Copy link
Author

oisin9 commented May 13, 2024

理解您的意思,如果用户如果想要在Mysql并发的执行事务,就需要开多个Connection,然后由二级工厂来创建任务,id分配和维护由用户来做。
我的想法是把这个放到复合任务来做,用户不需要是维护Connection和分配id,只需要在创建任务时指定是否开启事务,这样感觉使用逻辑上会更加方便和自然。

我先把精力放在协议的实现,这个可以后面再讨论。

@Barenboim
Copy link
Contributor

Barenboim commented May 13, 2024

理解您的意思,如果用户如果想要在Mysql并发的执行事务,就需要开多个Connection,然后由二级工厂来创建任务,id分配和维护由用户来做。 我的想法是把这个放到复合任务来做,用户不需要是维护Connection和分配id,只需要在创建任务时指定是否开启事务,这样感觉使用逻辑上会更加方便和自然。

我先把精力放在协议的实现,这个可以后面再讨论。

用户自己来指定也可以啊,就是把URL写成: mysql://user:pass@localhost/dbname?transation=xxx
我们是通过命名事务来实现的,那个WFMySQLConnection只是为了简化用户使用才加上的。

我们的任务都是并发的,不太可能完全不需要用户指定,就知道发到哪个连接吧。

@oisin9
Copy link
Author

oisin9 commented May 14, 2024

明白了,谢谢。
看到了下面的代码,

if (!transaction.empty())
	{
		this->WFComplexClientTask::set_info(std::string("?maxconn=1&") +
											info + "|txn:" + transaction);
		this->set_fixed_addr(true);
	}

@oisin9
Copy link
Author

oisin9 commented May 14, 2024

想请教下复合任务中的keep_alive_timeout这个成员方法应该怎么用 我看到分别在Communicator.cc中的下面三个方法里调用了keep_alive_timeout:

  1. Communicator::send_message_sync:先调用encode把消息发送后,调用keep_alive_timeout?
  2. Communicator::handle_reply_result: 看方法名猜测是处理返回结果的时候调用keep_alive_timeout?这里的PR_ST_FINISHED是代表什么完成了?
  3. Communicator::append_reply:看到是在接收完所有的返回值后,调用keep_alive_time

我的理解是他可以作为一个钩子,通过在复合任务中重写这个方法,在一些环节触发用来处理相关的逻辑,如果返回0还可以把连接给关上,请问我在重写这个方法的时候,应该把哪些逻辑放在这里去实现呢?

@Barenboim
Copy link
Contributor

前两个都是server任务的,你只需关注第三个调用位置。

client收完回复,调用handle之前,需要通过这个函数返回连接可以保持的时间。这个参考各个协议的实现就可以了。

@oisin9
Copy link
Author

oisin9 commented May 14, 2024

好的,感谢

@Barenboim
Copy link
Contributor

@oisin9
看你现在的做法你可能想先做出一版可以编译运行的程序先可以调试,所以会直接考虑task impl部分,但这样很难收敛。

我建议你先从Request和Response出发,把实际的用户请求消息先做出来。也不用考虑startup消息的实现。有了Request和Response,你就可以按我们简单自定义协议的server/client的方法,做出一组可以通讯的server/client来调试消息了。之后再做startup消息以及任务实现。

@oisin9
Copy link
Author

oisin9 commented May 15, 2024

感谢您的建议,我目前的思路是想把ComplexTask实现,然后用真实的PG数据库一步步调试,从startup到后面的协议,按顺序去实现。
绕过startup,先去实现用户请求消息和解析返回或许是个不错的新思路,我去试试看。

@Barenboim
Copy link
Contributor

感谢您的建议,我目前的思路是想把ComplexTask实现,然后用真实的PG数据库一步步调试,从startup到后面的协议,按顺序去实现。 绕过startup,先去实现用户请求消息和解析返回或许是个不错的新思路,我去试试看。

这么做的好处是更好的理解server/client一体,方便将来实现WFPostgresServer,可以用来做Postgres的代理服务器或与Postgres兼容的数据库服务。

@Barenboim
Copy link
Contributor

@oisin9 麻烦合并一下master最新代码。你之前fork的那个版本,正好在创建自定义协议任务时有点问题。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants