前言:
而今咱们对“es搜索mysql数据”可能比较关注,各位老铁们都需要了解一些“es搜索mysql数据”的相关内容。那么小编在网络上收集了一些有关“es搜索mysql数据””的相关知识,希望小伙伴们能喜欢,看官们一起来了解一下吧!mysql数据同步到elasticsearch数据解决方案问题场景
1.分库分表后多关联或者多条件查找效率低下,例如2b场景的查询,导出等需要多条件查询,继续用分库分表话效率低下。
2.数据量太多需要转移非关系型数据库elasticsearch存储
3.其他数据转移场景等
这两种场景都涉及到mysql数据同步到es数据解决方案,解决起来分总体两步走,一是存量数据的同步,二是增量数据的同步。这里利用的是canal的方案去同步数据,方案如下图所示
这个是不停机的方案,首先同时开启存量的数据的导入和增量数据的监听,待存量数据导入完成,开启java服务消费mq消息,对数据进行更新或者插入,若数据存在则进行更新,若数据不存在,是新插入则插入,是更新则保存到定时任务重试。这里只是理想方案,实际过程中和存量数据的大小,数据的增长率等有关系,具体实施肯定较为复杂。
若要执行停机方案,则比较简单,数据不再更新后,将存量数据插入到es后,再开启增量数据监听服务以及消费服务,这样es就能实时同步数据了,下面实践下canal adapter的mysql存量数据导入elasticsearch中。
增量数据导入elasticsearch
实践版本:
elasticsearch &kibana:7.12.1
canal.client-adapter:1.1.7-SNAPSHOT
mysql :8.0 主从
以这个分表的数据为例
CREATE TABLE `pay_parent_1` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_id` int NOT NULL, `status` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL, `creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除', `tenant_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '租户id', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1571152425171070978 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;复制代码
在kibana中建立索引
PUT /pay_parent_0 { "mappings":{ "properties":{ "id": { "type": "long" }, "user_id": { "type": "long" }, "status": { "type": "text" }, "creator": { "type": "text" }, "create_time": { "type": "date", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone" : "Asia/Shanghai" }, "updater": { "type": "text" }, "update_time": { "type": "date", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone" : "Asia/Shanghai" }, "deleted": { "type": "long" }, "tenant_id": { "type": "text" } } } }复制代码
client-adapter 配置 表和es索引的映射
dataSourceKey: defaultDS #数据源destination: pay_parent_0 #也可以从监听的数据源取数据 mq或者canalouterAdapterKey: es #对应的适配器的keygroupId: g1 #对应适配器的分组esMapping: index: pay_parent_0 #es索引名称 id: id #数据库主键对应es文档id 插入数据时一定要填写 # upsert: true #是否更新 以主键id作为更新条件 sql: "select id, user_id, status, creator, create_time, updater, update_time, deleted, tenant_id from pay_parent_0 as a" etlCondition: "where a.id={}" # etl 的条件参数 接口请求 commitBatch: 3000 # 提交批大小复制代码
下载canal源码,启动lanucher 模块,client-adapter 启动配置
srcDataSources: defaultDS: url: jdbc:mysql://xxx:3306/demo0?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT username: root password: xxx canalAdapters: - instance: canal.test.queue # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 key: es hosts: # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest cluster.name: elasticsearch复制代码
启动后执行,先测试第一条数据插入
插入后查看kibana,以创建时间搜索,如图所示,值得注意的是elasti存储时默认将时间转化为UTC时间存储,即0时区,内部默认是个长整型。所以这里的date格式的时间都时零时区的,但是搜索的时候kibana会进行时区转换。搜出来的结果是准确的。
另外,重复请求时,指定了id插入的数据会覆盖原来的数据,这个是es内部api的功能。
简单测试完后,测试下10w的数据插入,这里只需要把请求中的参数去掉就行,执行
可以看到性能还是不错,10w的数据准确无误的插入,花了41ms,外网和硬件条件一般的情况,内网的花更快。
总结
利用canal adpter的es插件可以实现mysql 同步的数据的功能,存量数据批量更新或者批量插入,非常方便。里面的源码插件的实现,配置文件分离,插入实例的实现以及mysql数据的批量插入都可以借鉴。若后续业务中有设计数据迁移到es中,参考实现是非常有帮助的。
标签: #es搜索mysql数据