一、前言
随着飞书深诺业务的发展,复杂度越来越高,随之而来的业务线与系统也越来越多,这些系统或业务之间的数据同步需求也越来越频繁。另一方面,在当前的互联网场景下,业务系统大多选用MySQL作为数据存储与处理方案,其在大部分场景下能较好的满足业务需求,但在其他一些场景下也慢慢显现出一些短板,如:需要对大量数据进行全文检索,对大量数据进行组合查询,分库分表后的数据聚合查询;此时我们自然想到如何使用其他更适合处理该类问题的数据组件(如ES等)来支撑这些场景。基于这两方面的原因,公司亟需一套灵活易用的系统间数据同步与处理方案,让特定业务数据可以很方便的在其他业务或组件间流转,助推业务快速迭代。
二、方案选型
当前业界针对系统数据同步较常见的方案有同步双写、异步双写、侦听binlog等方式,这些方案各有优缺点。这里我们主要以mysql同步到ES的场景来阐述。
2.1、同步双写
这是一种最为简单的方式,在将数据写到mysql时,同时将数据写到ES,实现数据的双写。
优点:
- 设计简单易懂
- 实时性高
缺点:
- 硬编码,有需要写入mysql的地方都需要添加写入ES的代码,会导致业务强耦合。
- 存在双写可能失败导致数据丢失的风险;如:ES系统不可用,应用系统和ES之间的网络故障,应用系统重启,导致系统来不及写入ES等。
- 对性能有较大影响,因为对于每次业务操作都需要加上一个ES操作,特别是如果对数据有强一致性要求,还需要通过事务处理。
2.2、异步双写
该方案主要是在同步双写的基础上增加一个MQ,实现异步写入。
优点:
- 解决了性能问题,MQ的性能基本比mysql高出一个数量级。
- 不易出现数据丢失问题,主要基于 MQ 消息的消费保障机制,比如 ES 宕机或者写入失败,还能重新消费 MQ 消息。
- 通过异步的方式做到了系统解耦,多源写入之间相互隔离,便于扩展更多的数据源写入。
缺点:
- 数据同步的实时性,由于MQ的消费可能由于网络或其它原因导致用户写入的数据不一定可以马上看到,造成延时。
- 虽然在系统逻辑上做到了解耦,但是存在业务逻辑里依然需要增加MQ代码,这块的耦合依然存在。
- 复杂度增加:多一个MQ中间件要维护。
- 硬编码问题,接入新的数据源需要实现新的消费者代码。
2.3、监听binlog
在第二种方案的基础上,我们主要需要解决业务耦合的问题,所以考虑引入数据变动自动监测与处理机制。
优点:
- 无代码侵入,原有系统不需要任何变化,没有感知。
- 性能高,业务代码完全不需要新增任何多余逻辑。
- 耦合度极低,完全不需要关注原来系统的业务逻辑。
缺点:
- 存在一定的技术复杂度。
- 数据的同步实时性可能会存在问题。
对于我们来说,这种基础组件的设计主要需要考虑的就是尽量做到对业务无侵入,接入无感知,同时系统耦合度要低,所以我们选择了方案三,同时我们考虑到该方案在可复用和可扩展方面还存在一定的短板,所以我们该方案的基础上做了一些优化。
三、整体方案设计
3.1、整体架构介绍
在我们的需求场景里,数据源基本都是MySQL,所以首先要考虑的就是选择何种组件对MySQL的数据变动做实时监听,业界针对该场景已经有较成熟的方案了,其中国内开发者最熟悉的就是canal,从它的功能完善度,社区活跃度,稳定性等多个层面看都完全符合我们的要求;所以,基于canal,我们对上述的方案三进行了优化,以满足多系统数据同步的需求,达到业务上可解耦、可复用、可扩展的目的。这里我们主要以MySQL同步到ES这种典型场景为例来阐述我们系统的核心流程。(系统可以支持通过在配置表只指定输出数据源的类型,同步到其他的数据产品,如MongoDB等)。
该设计的核心理念在于:通过统一的“消息分发服务”实现与Canal Client 的对接,并将消息按照统一规范的格式分发到不同的MQ集群中,通过统一的“消息消费服务”去消费消息回调业务接口,业务系统完全不需要关注数据的流转,只需关注特定业务的数据处理和数据组装。“消息分发服务”和“消息消费服务”对各业务线来讲,实现了数据流转过程中的功能复用。“消息消费服务”中的可分发到不同的MQ集群,和“消息消费服务”中的配置指定数据源输出实现了功能扩展。
下面对该系统的各核心模块做简要阐述:
- canal:负责监听数据源的数据变动。
- 消息分发服务:负责对接canal客户端,拉取变化的数据,将消息解析为json格式,按照固定的规则分发到MQ中,这里MQ可以根据业务配置指定到不同的集群,实现横向扩展。由于变更的数据可能是批量的,这里会将消息拆分为单条发送到MQ中,并且通过配置可以过滤掉一些业务上不需要的大字段,减少mq消息体。
- 消息消费服务:负责从配置表中加载MQ 队列,消费MQ中的消息,通过队列、回调接口、ES索引三者之间的映射关系,将消息POST给业务回调接口,接收到业务回调接口返回的操作指令和ES文档后,写入对应的ES索引。写入失败时插入补偿表,等待补偿。这里ES索引可以根据业务配置指定到不同的集群,实现横向扩展。
- 任务调度系统:负责定时调用消息消费服务中的消息补偿等定时任务接口。
- 业务回调服务:负责接收消息消费服务POST过来的消息,根据消息中的指令和数据,结合数据库中的数据或下游服务接口返回的数据组装ES文档中所需要的数据,设置相应的操作指令返回给消息消费服务去写入ES。
- 业务ES查询服务:负责通过ES SDK查询ES索引中的数据,通过接口返回给业务调用方。
3.2、数据订阅消息分发服务
我们将数据的订阅与数据的消费通过MQ进行解耦,“数据订阅消息分发服务”的职责是对接Canal Client,解析数据变更消息,转换为常用的JSON格式的消息报文,按照业务配置规则分发到不同的MQ集群、路由中。
3.2.1、基于Canal的数据变更监听机制
Canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
- Canal server 和 client 端的高可用方案依赖 zookeeper, 启动 canal server 和 client 的时候都会从 zookeeper 读取信息
- Canal server 和 client 启动的时候都会去抢占 zk 对应的 running 节点, 保证只有一个 server 和 client 在运行, 而 server 和 client 的高可用切换也是基于监听 running 节点进行的
- Canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)
- 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态
- 一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance
- Canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect
3.2.2、基于Canal Client 分摊设计提升系统处理效率
从Canal服务高可用设计中可以看出Canal Client当有多个实例启动时,会保证只有一个实例在运行,消费binlog消息。而我们承载Canal Client的”数据订阅消息分发服务”会部署在多台服务器上,由于服务发布时每台服务器启动的时间不同,所有的Canal Client活跃的实例都会集中在先启动的那台服务器上运行,消费binlog消息。另外的服务器上运行的Canal Client都处于备用状态,不能充分的利用每台服务器的资源。因此我们希望不同的destination分摊在不同的服务器上执行,但所在的服务器宕机时会自动转移到其他服务器上执行,这样可以充分利用每一台服务器,提供binlog消息消费的性能。
为了达到将destination分摊到每一台服务器的效果,我们引用了elasticjob-lite这个组件,利用其分片的特性,进行二次封装,实现侦听destination在某台服务器中上下线的变更事件。elasticjob-lite分片原理解释:
ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更。举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,如下图所示。
当新增加作业服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片,分片如下图所示。
当作业服务器在运行中宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用的效果。本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。作业高可用如下图所示。
3.2.3、资源隔离
由于该系统的使用方包含公司各业务线,所以如何保障线上出了问题后,各个业务之间不要相互影响是一个很重要的需求,我们的做法是在MQ的集群和队列级别都支持基于业务的资源隔离;我们将从canal中拉取出来的变更消息,按照一定的规则分发到不同的MQ集群中,设置统一的路由键规则, 以便各业务在对接时申请自己业务的MQ队列,按需绑定对应的MQ集群和消息路由。
- MQ集群路由
我们通过配置将不同的destination映射到不同的MQ集群和ZK集群,可以做到性能的横向扩展。
- MQ消息路由规则
canal从binlog中获取的消息后,将批量消息拆分成单条消息,进行分片规则运算后发送到指定的rabbitmq 交换机和路由键,以便根据不同的业务场景,按不同的业务规则绑定到不同的队列中,通过消费服务进行消息消费处理,同时会建立一个名为“exchange.canal”的exchange,类型为 topic,路由键规则:key.canal.{destination}.{database}.{table}.{sharding},sharding按pkName-value排序后的hashcode取模分片,队列命名规则约定:queue.canal.{appId}.{bizName} 如:
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_item.0
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_extend.0
...
3.3、数据订阅消息消费服务
为了达到消息的消费与业务系统进行解耦的目的,我们独立出一个”数据订阅消费服务”。这个服务的职责是消费从”数据订阅消息分发服务“中投递的数据变更MQ消息,根据业务配置回调指定的业务回调接口。业务回调接口负责接收数据变更消息,组装需要执行的ES文档信息,返回给消费服务进行ES数据操作。
3.3.1 执行指令
从binlog中订阅出的消息有3类操作:INSERT,UPDATE,DELETE,这里们新增一个SELECT指令,这个指令的作用是业务回调接口在收到这个指令后从数据库中重新获取最新的数据组装成需要执行的ES文档信息,返回给消费服务进行ES数据操作。主要应用在全量同步,部分同步,文档刷新,消息补偿等场景,下文会详细介绍。
3.3.2 增量同步
- MQ队列动态加载
新的业务功能上线时,会配置对应的队列绑定相关的路由键,订阅到业务场景需要的数据变更的消息。为了避免每次有新业务接入需要重新更新消费服务代码,重新发布服务,需要实现能够定时加载配置表数据,实现动态添加MQ队列侦听的功能。这里我们使用SimpleMessageListenerContainer容器设置消费队列的动态监听。为每个MQ集群创建一个SimpleMessageListenerContainer实例,并动态的注册到Spring容器中。
- 业务队列绑定规则
一个业务通常会对应一个ES索引,一个或多个MQ队列(队列绑定路由键的规则见: MQ消息分片规则)。
- MQ消息顺序消费
一个queue,有多个consumer去消费, 因为我们无法保证先读到消息的 consumer 一定先完成操作,所以可能导致顺序错乱。出现这个问题的主要原因是,不同消息都发送到了一个queue 中,然后多个消费者消费同一个queue的消息。所以我们可以创建多个queue,每个消费者只消费一个queue, 生产者根据一定的规则把消息放入同一个queue(见:3.4.4.2 MQ消息分片规则),这样同一个消息就只会被同一个消费者顺序消费。
然而我们的服务通常都是集群模式部署的,天然每个queue就会有多个consumer。为了解决这个问题我们引入elasticjob-lite对消息队列进行分片,比如我们有2个服务实例,5个队列,我们可以让实例1消费队列1、2、3,让实例2消费队列4、5。当其中有一个实例1挂掉时会自动将队列1、2、3的消费转移到实例2上,当实例1重启启动后队列1、2、3的消费会重新转移到实例1。
对于RabbitMQ来说,导致消费顺序错乱的原因通常是队列的消费是单机多线程消费或消费者是集群部署的,由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。如消费者A执行了增加,消费者B执行了修改,消费者C执行了删除,但是消费者C执行比消费者B快,消费者B又比消费者A快,就会导致消费 binlog 执行到ES的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。如下图:
解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者单线程固定消费一个 queue 的消息,生产者发送消息的时候,同一个单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。如下图:
这里有个重要的问题是如何保证在集群模式下如何保证一个队列只在一台机器上进行单线程消费,如果这台机器宕机了如何进行故障转移。 为了解决这个问题我们引入elasticjob-lite对消息队列进行分片,比如我们有2个服务实例,5个队列,我们可以让实例1消费队列1、2、3,让实例2消费队列4、5。当其中有一个实例1挂掉时会自动将队列1、2、3的消费转移到实例2上,当实例1重启启动后队列1、2、3的消费会重新转移到实例1。
对于对消息顺序消费敏感的业务场景,我们通过队列分片的方式来提升整体的并发度。对于对消息顺序消费不敏感的业务场景我们也可以配置成某个队列集群消费或单机并发消费。针对不同的业务场景合理选择不同的配置方案,提升整体性能。
3.3.3 全量同步
通过Canal获取的变更消息只能满足增量订阅数据的业务场景,然而我们通常我们还需要进行一次全量的历史数据同步后增量数据的订阅才会有意义。对于业务数据表的id是自增模式时,可以通过给定一个最小id值,最大id值,然后进行切片,如100个一片,生成MQ报文,发送到MQ中。消费MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.4 部分同步
有的时候我们需要修复指定的数据,或业务表的id是非自增模式的,需要进行全量同步。可以通过部分同步的接口,指定一组需要同步的id列表,生成分片MQ报文,发送到MQ中。消费服务接收到同步MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.5 刷新文档
当我们ES索引中有大批量的数据异常,需要重新刷新ES索引数据时,可以通过生成一个全量同步的任务,分页获取指定ES索引的文档ID列表,模拟生成部分同步消息报文,发送到MQ中。消费MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.6 消息补偿
对于处理失败的消息,我们设计了消息补偿机制,它会将同步失败的消息存储到消息重试表中,通过Job执行补偿,便于监控。补偿时将消息重置为 SELECT 类型的MQ报文。业务回调接口接收到消息后会从数据库中获取最新的数据更新ES文档。
3.4、ES SDK功能扩展
目前ES官方推荐使用的客户端是RestHighLevelClient,我们在此基础上进行了二次封装开发,主要从扩展性和易用性方面考虑。
3.4.1、常用功能封装
- 使用工厂模式,方便注册和获取不同ES集群对应的RestHighLevelClient实例,为业务端使用时对ES集群的扩展提供便利。
- 对RestHighLevelClient的主要功能进行二次封装如:索引的存在判断、创建、更新、删除;文档的存在判断、获取、新增、更新、保存、删除、统计、查询。 降低开发人员使用RestHighLevelClient的复杂度,提高开发效率。
3.4.2、ES查询数据权限隔离
对于一些有数据隔离需求的业务场景,我们提供了一个ES数据隔离插件。在ES SDK中设计了一个搜索过滤器的接口,采用拦截器的方式对统计文档,搜索文档等方法的搜索条件参数进行拦截过滤。
/**
* 搜索过滤器
*/
public interface SearchSourceBuilderFilter {
String getFilterName();
void filter(SearchSourceBuilder searchSourceBuilder);
}
四、我们踩过的坑
整个系统的设计与开发过程中,我们也遇到了不少问题,这里我们列举一些比较有借鉴意义的问题点,让大家在遇到类似问题的时候可以少走一些弯路
4.1 Canal相关问题
4.1.1 Canal Admin 部署时需注意的配置项
- 如何支持HA:需要将’canal.instance.global.spring.xml’ 设置为 ‘classpath:spring/default-instance.xml’
- 设置合适的并行线程数:canal.instance.parser.parallelThreadSize,我们当前设置的是16,如果该配置项被注释掉,那么可能会导致解析阻塞
- 开启tsdb导致的各种问题:canal默认开启tsdb功能,也就是会通过h2数据库缓存解析的表结构,但是实际情况下,如果上游变更了表结构,h2数据库对应的缓存是不会更新的,这个时候一般会出现神奇的解析异常,异常的信息一般如下:‘Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table’,该异常还会导致一个可怕的后果:解析线程被阻塞,也就是binlog事件不会再接收和解析。目前认为比较可行的解决方案是:禁用tsdb功能,也就是canal.instance.tsdb.enable设置为false。如果不禁用tsdb功能,一旦出现了该问题,必须要「先停止」Canal服务,接着「删除」$CANAL_HOME/conf/目标数据库实例标识/h2.mv.db文件,然后「启动」Canal服务。目前我们是设置为禁用的。
- 设置合理的订阅级别:其配置项是‘canal.instance.filter.regex’;库表订阅的配置建议配置到表级别,如果定义到库级别一方面会消费一些无效的消息,给下游的MQ等带来不必要的压力。还有可能订阅到一些日志表等这类有着大字段数据的消息,消息过大在JSON化的时候可能导致内存溢出异常。针对这个问题我们进行大字段过滤和告警的改造。
4.1.2 binlog文件不存在,导致同步异常
如果发现Canal Client 长时间获取不到binlog消息,可以去Canal Admin 后台去看一下Instance管理中的日志。大概率会出现“could not find first log file name in binary log index file”,这个是因为zk集群中缓存了binlog信息导致拉取的数据不对,包括定义了binlog position但是启动服务后不对也是同样的原因。解决办法:如果是单机部署的话删除canal/conf/$instance目录中的meta.dat文件即可,如果是集群模式需要进入zk删除/otter/canal/destinations/xxxxx/1001/cursor,然后重启canal。
4.2 ES updateByQuery问题
ES的Update By Query对应的就是关系型数据库的update set … where…语句;该命令在ES上支持得不是很成熟,可能会导致的问题有:批量更新时非事务模式执行(允许部分成功部分失败)、大批量操作会超时、频繁更新会报错(版本冲突)、脚本执行太频繁时又会触发断路器等。我们的解决办法也比较简单,直接在生产环境放弃使用updateByQuery方法,配置成使用先查询出符合条件的数据,然后分发到MQ中单条分别更新的模式。
五、后续优化方向
该系统已经在公司稳定运行了一段时间,其基础功能基本满足当前的需求,但还是存在不少需要优化的点,包括:
- 由于该系统在公司的系统体系里越来越重要,同时其本身涉及到不少子系统,如何监测该系统本身的工作是否正常,在出问题的时候及时的报警告知相关人员,是该系统需具备的一个重要特性,特别在业务连续性监控上,如系统内特定组件工作异常导致数据同步流中断,是后续需重点优化的方向。
- 公司有些对实时性要求较高的业务依赖该系统进行数据同步,随着业务量越来越大,该方案当前当前采用的MQ组件在性能和高可用性都有所欠缺,后续打算采用性能更好,可用性机制更完善的MQ组件进行替代。
- 由于我们采用的是小步快跑的迭代思路,设计的时候更多考虑线上运行的顺畅性,而忽略了新业务的接入便利性,目前一个新的业务服务对接数据同步系统,需要维护人员做不少配置文件,数据库等相关的修改,并做人工确认,随着接入需求越来越频繁,亟需一个管理后台,提升接入的效率和自动化程度。