龙空技术网

mysql数据同步到elasticsearch数据解决方案

程序猿不相信眼泪 2806

前言:

而今咱们对“es搜索mysql数据”可能比较关注,各位老铁们都需要了解一些“es搜索mysql数据”的相关内容。那么小编在网络上收集了一些有关“es搜索mysql数据””的相关知识,希望小伙伴们能喜欢,看官们一起来了解一下吧!

mysql数据同步到elasticsearch数据解决方案问题场景

1.分库分表后多关联或者多条件查找效率低下,例如2b场景的查询,导出等需要多条件查询,继续用分库分表话效率低下。

2.数据量太多需要转移非关系型数据库elasticsearch存储

3.其他数据转移场景等

这两种场景都涉及到mysql数据同步到es数据解决方案,解决起来分总体两步走,一是存量数据的同步,二是增量数据的同步。这里利用的是canal的方案去同步数据,方案如下图所示

这个是不停机的方案,首先同时开启存量的数据的导入和增量数据的监听,待存量数据导入完成,开启java服务消费mq消息,对数据进行更新或者插入,若数据存在则进行更新,若数据不存在,是新插入则插入,是更新则保存到定时任务重试。这里只是理想方案,实际过程中和存量数据的大小,数据的增长率等有关系,具体实施肯定较为复杂。

若要执行停机方案,则比较简单,数据不再更新后,将存量数据插入到es后,再开启增量数据监听服务以及消费服务,这样es就能实时同步数据了,下面实践下canal adapter的mysql存量数据导入elasticsearch中。

增量数据导入elasticsearch

实践版本:

elasticsearch &kibana:7.12.1

canal.client-adapter:1.1.7-SNAPSHOT

mysql :8.0 主从

以这个分表的数据为例

CREATE TABLE `pay_parent_1` (  `id` bigint NOT NULL AUTO_INCREMENT,  `user_id` int NOT NULL,  `status` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL,  `creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  `updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',  `tenant_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '租户id',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1571152425171070978 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;复制代码

在kibana中建立索引

PUT /pay_parent_0  {    "mappings":{        "properties":{          "id": {            "type": "long"          },          "user_id": {            "type": "long"          },          "status": {            "type": "text"          },          "creator": {            "type": "text"          },          "create_time": {            "type": "date",            "formats" : ["yyyy-MM-dd HH:mm:ss"],        "timezone" : "Asia/Shanghai"          },          "updater": {            "type": "text"          },          "update_time": {            "type": "date",            "formats" : ["yyyy-MM-dd HH:mm:ss"],            "timezone" : "Asia/Shanghai"          },          "deleted": {            "type": "long"          },          "tenant_id": {            "type": "text"          }      }    }  }​复制代码

client-adapter 配置 表和es索引的映射

dataSourceKey: defaultDS #数据源destination: pay_parent_0 #也可以从监听的数据源取数据 mq或者canalouterAdapterKey: es #对应的适配器的keygroupId: g1 #对应适配器的分组esMapping:  index: pay_parent_0 #es索引名称  id: id #数据库主键对应es文档id 插入数据时一定要填写  #  upsert: true  #是否更新 以主键id作为更新条件  sql: "select id, user_id, status, creator, create_time, updater, update_time, deleted, tenant_id        from pay_parent_0 as a"  etlCondition: "where a.id={}"   # etl 的条件参数 接口请求  commitBatch: 3000  # 提交批大小复制代码

下载canal源码,启动lanucher 模块,client-adapter 启动配置

  srcDataSources:    defaultDS:      url: jdbc:mysql://xxx:3306/demo0?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT      username: root      password: xxx  canalAdapters:  - instance: canal.test.queue # canal instance Name or mq topic name    groups:    - groupId: g1      outerAdapters:      - name: logger      - name: es7        key: es        hosts:   # 127.0.0.1:9200 for rest mode        properties:          mode: rest # or rest          cluster.name: elasticsearch复制代码

启动后执行,先测试第一条数据插入

插入后查看kibana,以创建时间搜索,如图所示,值得注意的是elasti存储时默认将时间转化为UTC时间存储,即0时区,内部默认是个长整型。所以这里的date格式的时间都时零时区的,但是搜索的时候kibana会进行时区转换。搜出来的结果是准确的。

另外,重复请求时,指定了id插入的数据会覆盖原来的数据,这个是es内部api的功能。

简单测试完后,测试下10w的数据插入,这里只需要把请求中的参数去掉就行,执行

可以看到性能还是不错,10w的数据准确无误的插入,花了41ms,外网和硬件条件一般的情况,内网的花更快。

总结

利用canal adpter的es插件可以实现mysql 同步的数据的功能,存量数据批量更新或者批量插入,非常方便。里面的源码插件的实现,配置文件分离,插入实例的实现以及mysql数据的批量插入都可以借鉴。若后续业务中有设计数据迁移到es中,参考实现是非常有帮助的。

标签: #es搜索mysql数据