龙空技术网

如何实现MySQL到ES的数据实时同步?

从程序员到架构师 555

前言:

此时各位老铁们对“mysql数据库间同步”大约比较讲究,看官们都需要了解一些“mysql数据库间同步”的相关文章。那么小编也在网上搜集了一些有关“mysql数据库间同步””的相关文章,希望大家能喜欢,同学们快快来学习一下吧!

MySQL到ElasticSearch的数据实时同步功能主要用于将MySQL数据库中的数据通过实时的方式或者是接近于实时的方式同步到ElasticSearch中,这样我们可以利用ElasticSearch强大的数据搜索和分析能力,来进行数据的查询搜索等支持。

常用的工具和技术

在实现MySQL到ElasticSearch数据同步过程中,我们常用的技术和工具有如下几种。

Debezium

这是一个开源的CDC(Change Data Capture)平台,它支持了MySQL、PostgreSQL、MongoDB等数据库的数据同步操作,通过读取到数据库的Binlog操作日志,将捕获的数据变化操作发送到Kafka中,然后通过Kafka Connect 将数据写入到ElasticSearch中。

Logstash

Logstash是Elastic公司提供的数据处理管道工具,支持从多种数据源中读取数据,然后写入到ElasticSearch,可以通过JDBC操作读取到MySQL中的数据,然后通过Logstash将数据写入到ElasticSearch中。

Canal

Canal是阿里开源的MySQL Binlog解析工具,可以实现将MySQL的数据变化Binlog数据解析成JSON数据并且发送到消息中间件MQ中,然后通过消息中间件将数据变化写入到ElasticSearch中与上面提到的Debezium的模式一样。

使用Debezium来进行同步处理

根据上面的介绍使用Debezium来进行数据同步操作需要通过Kafka来进行存储和传输,所以需要安装Kafka与Zookeeper。

安装好Kafka和Zookeeper之后,接下来就是安装Debezium连接器,并且将其放到Kafka的连接目录中,然后配置Debezium连接器。

创建Debezium连接器配置文件register-mysql.json,内容如下

{  "name": "mysql-connector",  "config": {    "connector.class": "io.debezium.connector.mysql.MySqlConnector",    "database.hostname": "localhost",    "database.port": "3306",    "database.user": "mysqluser",    "database.password": "mysqlpw",    "database.server.id": "184054",    "database.server.name": "fullfillment",    "database.include.list": "inventory",    "database.history.kafka.bootstrap.servers": "localhost:9092",    "database.history.kafka.topic": "schema-changes.inventory"  }}

然后通过如下的命令来注册连接器,如下所示。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-mysql.json
配置Kafka Connect和Elasticsearch连接

配置好Debezium与Kafka的连接之后,接下来就是配置Kafka与ElasticSearch的连接。创建Kafka Connect与Elasticsearch的连接配置文件register-elasticsearch-sink.json,内容如下所示。

{  "name": "elasticsearch-sink",  "config": {    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",    "tasks.max": "1",    "topics": "fullfillment",    "connection.url": ";,    "type.name": "_doc",    "key.ignore": "true",    "schema.ignore": "true"  }}

通过如下的命令来注册ElasticSearch的连接器信息。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-elasticsearch-sink.json

配置完成之后,我们就可以通过修改MySQL的数据来测试ElasticSearch中的数据是否发生了变化。

参考网站Debezium: Kafka Connect Elasticsearch Connector: Logstash : Canal :

如果与到什么问题可以参考上面的网站链接进行操作。

标签: #mysql数据库间同步