龙空技术网

谁动了我的数据?去哪儿网MySQL嗅探程序

编程菌zfn 162

前言:

现在咱们对“mysql端口查询”大致比较着重,咱们都想要剖析一些“mysql端口查询”的相关知识。那么小编在网上汇集了一些关于“mysql端口查询””的相关资讯,希望姐妹们能喜欢,看官们快快来学习一下吧!

作者介绍

雷孝龙, 去哪儿网资深DBA。2019年8月加入去哪儿网,负责公司的MySQL/Redis运维,以及自动化方案的设计与实施。曾就职于达梦数据库、映客直播;擅长于数据库管理、维护及优化等工作。

一、背景

作为 DBA ,在日常运维工作中,常常会被开发同学问到类似下面的这些问题:我的这条数据不知道被谁改了,能帮我查一下吗?能帮我查查这张表都有谁在访问吗?昨天某个时间监控有个尖刺,能帮忙看看执行了哪些 SQL 吗?某一个 SQL 语句中有手机号、身份证号,或者密码明文等敏感信息,这个语句是谁在执行?某一张表中存储了手机号、密码等明文信息,又是谁在读写这张表呢?

面对这些问题,我们会想方设法去找找慢日志,或者查询日志,甚至分析 binlog ,运气好的话可以找到一些线索,但大多数时候都不尽人意。详细的访问信息,带用户名、客户端 IP 的只有慢日志会记录,生产环境又不可能把所有 SQL 都记录下来,那会极大的拖慢系统性能,甚至有几率会带来故障,因此这成为了一个让 DBA 头疼的问题。

那么,怎么获取到数据全量的访问日志呢,我们来分析一下常见的一些方法:从 Server 端记录

就是前面提到的,通过慢日志、查询日志来记录。优缺点也很明显,优点就是记录的信息准确,开启简单;缺点就是会占用系统性能,同时带来风险,这种实现方式通常有几种方案:

General log:这种方案可以记录所有访问日志,但日志内容匮乏,只包括了 Query 语句,而缺少了访问 IP ,用户名等信息,并且如果线上都打开这个日志文件,对 MySQL 性能的冲击很大,容易产生故障,所以很难实现我们的需求。Slow log:将 long_query_time 参数设置为 0 ,即可记录所有 MySQL Server 端执行的 SQL 语句,用户名、客户端 IP 、执行时间等信息均有记录,但是带来的问题与开启 General log 相同,生产环境不可能一直开启。Audit plugin:使用 MySQL 自带的审计功能,其实是可以实现部分需求的,但问题点在于 audit 本身是 MySQL 的一个插件,与版本关系比较大,功能不尽相同,而最大的一个问题是,如果开启这个插件功能,需要重启数据库,这是比较要命的,所以也不能被采用。从客户端记录

客户端发起请求和收到响应时,能够记录自己发出的 SQL 请求,结果集以及响应时间,但我们知道,客户端众多,汇总起来是一个问题,也不能保证每台机器执行 SQL 的时序性。更不能体现 Server 端的真实执行时间,网络开销、客户端程序处理耗时等因素没法排除。

MySQL中间层记录

通过中间层日志追加的方式,记录所有对于底层数据库的请求;但这种方式受限于中间层的特性,并且 MySQL 和中间层的关系往往是多对多,也存在一个汇总的问题,同时也会增加额外开销。更加重要的是,在我们使用的 MySQL 架构中,没有中间层这个模块,所以这种方案就已经被排除在外。

网络旁路抓取

这是一种常见的 SQL 审计手段,对服务是非入侵式的,通过网络抓包来获取对于数据库的全量访问日志。这种方式既不会对服务造成任何影响,同时也可以控制对于服务器资源的占用,准确性也有一定保障

根据上面所分析的方案,我们决定使用最为安全,运维最容易的方案,即网络旁路抓取的方案。

二、功能需求

抓包成功率:需要能够确保准确的抓取到所有对于数据库的请求,这也是决定这个程序是否可用的最重要因素。多通道、自主探测发现:程序能够对于服务器数据库实例自主探测发现,当创建新集群新实例时,程序能够发现新的数据库端口,并对其数据流进行收集抓取,也就是说要实现自动化、智能化,尽可能少的人为干预。数据便于分析处理:需要有一个合理的存储将数据汇总,从数据库维度、业务维度对访问日志进行分析汇总,既要保证数据分析的时效性,又能保证数据容量足够大,能存储线上所有抓取回来的数据。需要查明细的时候,还能很快地查出来。便于开发维护:开发语言需要简单易懂,便于快速迭代,且兼容性好,适用于线上不同版本的操作系统;后期维护发布等也需要考虑到,一旦变更需要所有服务器重新部署。稳定、轻量级、高性能:程序需要有能够在线上稳定运行,虽然挂了不影响线上服务,但收集到访问数据准确性会大打折扣;其次,抓包程序部署在数据库服务器,不能占用过多系统资源,否则得不偿失,所以需要在有限的资源范围内,还得保证程序的高性能,能够快速处理掉抓取到的大量数据。

开源程序调研

我们调研了几个目前已有的 MySQL 抓包程序,结果如下:

通过对比,目前没有比较适合于我们诉求的抓包工具,需要重新开发一个适用于 Qunar 自身的数据库抓包程序。

程序结构

通过上面的讲述,已经明确了我们现在需要做的事情,就是将某个 MySQL 端口的流量抓取到,然后按照 MySQL 公布的协议格式,分析出来客户端与 MySQL Server 之间往来流转的信息,包括最主要的 SQL 语句,执行时间,影响行数,用户名, IP 等信息。

程序结构图

主函数代码示例

type Sniffer struct {    sessionMgr     *SessionManager    ports          []int    device         string    handle         *pcap.Handle    decoders       []*MySQLDecoder    packetChannels []chan gopacket.Packet    resultChannels []chan MySQLResult    writeChannels  []chan Result    dbConnManager  *DbConnManager}func (snf *Sniffer) Start() {  // 初始化会话    snf.sessionMgr.InitSession(snf.dbConnManager, snf.ports)  // 使用 gopacket 对数据包进行实时捕获    if handle, err := pcap.OpenLive(snf.device, 65535, true, pcap.BlockForever); err != nil {        ...    ...        return    } else {        snf.handle = handle    }  // 过滤MySQL实例端口    snf.setBPF()    go snf.Dispatch()  // 解析MySQL协议    go snf.Decoder()  // 对解析对包结果进行处理    go snf.handleResult()  // 结果投入到 kafka    go snf.Write()  // 定时任务,用于检测新建的数据库实例、抓包率统计等    go snf.ServerCron()  // 关闭会话    go snf.sessionMgr.deleteSession(snf.resultChannels[0])}
2.1 如何实现抓包

Linux 操作系统对数据包的处理流程:

网卡->网卡驱动->数据链路层->IP层->传输层->应用程序

Libpcap

Libpcap 是 Packet Capture Libray 的英文缩写,即数据包捕获函数库。该库提供的C函数接口,用于捕捉经过指定网络接口的数据包。

我们熟知的 tcpdump 和 wireshark 就是在 Libpcap 的基础上开发而成的。Libpcap 提供的接口函数实现和封装了与数据包截获有关的过程。Libpcap 提供了用户级别的网络数据包捕获接口,并充分考虑到应用程序的可移植性。Libpcap 可以在绝大多数 Linux 平台上运行。

它的工作在上层应用程序与网络接口之间。

主要功能数据包捕获:捕获流经网卡的原始数据包;自定义数据包发送:构造任何格式的原始数据包;流量采集与统计:采集网络中的流量信息;规则过滤:提供自带规则过滤功能,按需要选择过滤规则。2.2 MySQL协议解析

MySQL 客户端与服务器的完整交互过程如下

MySQL 数据包报文结构

主要分为登陆报文、客户端命令请求报文、服务器响应报文。

1、登陆报文手初始化报文(服务器 -> 客户端)登陆认证报文(客户端 -> 服务器)

2、客户端命令请求报文(客户端 -> 服务器)

命令:用于标识当前请求消息的类型,例如切换数据库(0x02)、查询命令(0x03)等,常见命令说明如下:

3、服务器响应报文

当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。

2.3 抓包结果结果处理

解析报文后,我们需要保留的有用信息如下:

{    "client_host": "192.168.225.219",    "client_addr": "192.168.225.219:59154",    "user": "h_qta_rw",    "db": "qta_product_baseinfo",    "rows": 1,    "bytes_send": 625,    "query": "select\n         \n        id,\n        wrapper_id,\n        real_phone,\n        phone_type,\n        did_phone,\n        did_prefix\n     \n        from\n        wrapper_did_phone where wrapper_id='hta10850s5i'",    "thread_id": 3243074,    "request_time": "2021-09-26T02:40:24.459756+08:00",    "response_time": "2021-09-26T02:40:24.459874+08:00",    "error": "",    "duration": "0.118 ms",    "db_addr": "10.88.133.221:3306",    "port": 3306,    "host_name": "l-xxx.h.cn5",    "tables": "qta_product_baseinfo.wrapper_did_phone",    "checksum": "33353139414136414438343442394533",    "fingerPrint": "select id, wrapper_id, real_phone, phone_type, did_phone, did_prefix from wrapper_did_phone where wrapper_id=?"}

2.4 结果存储

我们抓取到的数据,最终都需要存储起来,这样才能用来分析,才能被运维的时候使用到,但我们线上有上千个 MySQL 实例,总共的 QPS 有几十万,这么大的数据量如何存储呢?

我们最初考虑使用的是 Kafka+ES+Flink 的方案,使用了一段时间后,发现有些问题不能满足我们的需求,包括:

ES 查询速度慢,导致分析起来很费时间。整条数据流,从 Kafka 出来之后,延迟就非常大,通常要达到好几个小时,不能接受。聚合配置,修改起来非常困难,并且是由另一个团队来负责的,查问题的时候不太方便。

由于上面所述的问题,我们决定换一种实现方式,经过调研发现,现在的数据分析方案中,Clickhouse 是比较火的,并且速度非常快,也支持集群,所以我们尝试使用这个方案来解决这个问题,即:Kafka+Clickhouse。

经过一段时间的使用,发现这种方案非常好,基本没有延迟,并且数据保证的精度也很高,不会因为聚合丢失太多有用的数据,并且占用空间并不会太大,下面是我们使用 Clickhouse 方案中所涉及到的一些细节

如前面程序结构图所示,我们将抓包所产生的json串投递到对应机房kafka,再通过Clickhouse自带的 Kafka Engine 表接收 kafka 数据,Clickhouse 存储了所有线上 MySQL 实例的抓包结果。

Clickhouse接受收kafka数据流程如下:在 ClickHouse 中建立 Kafka Engine 外表,作为 Kafka 数据源的一个接口。

CREATE TABLE sniffer.sniffer_kafka_queue_cn1(    `client_host` String,    `client_addr` String,    `user` String,    `db` String,    `rows` Int32,    `bytes_send` UInt32,    `query` String,    `thread_id` UInt32,    `request_time` String,    `response_time` String,    `error` String,    `duration` String,    `db_addr` String,    `port` UInt32,    `host_name` String,    `tables` String,    `checksum` String,    `fingerPrint` String)ENGINE = Kafka()SETTINGS kafka_broker_list = 'l-kafka1.data.cn1:9092,l-kafka2.data.cn1:9092,l-kafka3.data.cn1:9092,l-kafka4.data.cn1:9092', kafka_topic_list = 'logs.logger.d_dba_prod', kafka_group_name = 'clickhouse-reader', kafka_format = 'JSONEachRow', kafka_max_block_size = 524288
在 ClickHouse 中创建普通表(通常是 MergeTree 系列)存储 Kafka中 的数据。
CREATE TABLE sniffer.sniffer_view(    `client_host` String,    `client_addr` String,    `user` String,    `db` String,    `rows` UInt32,    `bytes_send` UInt32,    `query` String,    `request_time` DateTime,    `response_time` DateTime,    `duration` String,    `db_addr` String,    `port` UInt32,    `host_name` String,    `tables` String,    `checksum` String,    `fingerPrint` String,    `times` UInt32,    `ts` DateTime,    `_topic` Array(String),    `_offset` Array(UInt64),    `_partition` Array(UInt64))ENGINE = ReplicatedMergeTree('/clickhouse/tables/newshard1/sniffer_view', 'l-xxx.dba.cn1')PARTITION BY (toYYYYMMDD(response_time), substr(host_name, length(host_name) - 2, 3))ORDER BY (response_time, host_name, port, tables, checksum)TTL response_time + toIntervalDay(7)SETTINGS index_granularity = 8192
在 ClickHouse 中创建 Materialized View , 监听 Kafka 中的数据,并将数据写入 ClickHouse 存储表中。
CREATE MATERIALIZED VIEW sniffer.sniffer_kafka_queue_cn1_mv TO sniffer.sniffer_view(    `client_host` String,    `client_addr` String,    `user` String,    `db` String,    `rows` UInt32,    `bytes_send` UInt32,    `query` String,    `request_time` DateTime,    `response_time` DateTime,    `duration` String,    `db_addr` String,    `port` UInt32,    `host_name` String,    `tables` String,    `checksum` String,    `fingerPrint` String,    `times` UInt64,    `ts` DateTime,    `_topic` Array(String),    `_partition` Array(UInt64),    `_offset` Array(UInt64)) ASSELECT    client_host,    client_addr,    user,    db,    max(rows) AS rows,    max(bytes_send) AS bytes_send,    any(query) AS query,    toDateTime(substr(max(request_time), 1, 19)) AS request_time,    toDateTime(substr(max(response_time), 1, 19)) AS response_time,    max(duration) AS duration,    any(db_addr) AS db_addr,    port,    host_name,    tables,    checksum,    any(fingerPrint) AS fingerPrint,    count(*) AS times,    now() AS ts,    groupArray(_topic) AS _topic,    groupArray(_partition) AS _partition,    groupArray(_offset) AS _offsetFROM sniffer.sniffer_kafka_queue_cn1GROUP BY    client_host,    client_addr,    user,    db,    port,    host_name,    tables,    checksum

三、开发与运维

上图所展示的是,我们的 sniffer 程序在开发和运维过程中所涉及到的模块和单元,因为这个程序需要在每一台数据库服务器上需要部署,我们的目标是尽可能实现免维护的特性,所以需要在核心功能之外增加很多额外的功能,主要包括:

端口自我发现: 主要用来实现新部署一个实例,sniffer 能自动地发现这个实例,并且抓取到对应的流量,而不需要 DBA 再去处理新实例的这方面的工作了。心跳: 主要是用来方便管理哪些还活着,哪些是不是由于某种原因已经死掉了。运维状态: 包括抓取队列,发送队列,实时抓包率等信息,还是方便我们在管理过程中,更精确地掌握 sniffer 的实时信息。垃圾回收: 这个主要是为了让 sniffer 更简洁,占用资源更少,更轻量而去做的。

实现了上面的功能之后,在运维中,就可以很容易地实现批量部署,灰度发布,异常报警,自动发现,平滑升级,异常恢复等功能了,目前我们的状态基本上就是免维护的,运行稳定。

四、问题分析

在开发使用过程中,遇到了很多问题,经过好几个同学的手,逐步解决掉的,主要包括:系统丢包问题,程序的内存泄漏问题,解析数据性能问题,SQL 语法解析问题,信息丢失或者不全的问题等。解决这些问题是一个复杂而且漫长的过程,也是程序的难点所在,由于本章篇幅有限,先带大家简单了解我们抓包程序的功能和大体实现方法及运维思路,这些问题留到之后来分享,尽请期待。

五、总结与展望

目前,我们所有 MySQL 服务器均已部署了抓包程序,对于线上约 95% 左右的实例,我们的抓包准确率能够达到 98% 以上,个别请求量大,SQL 复杂的实例,丢包仍然存在,我们仍在不断努力改进中,目前也基本满足了我们的需求。

有了数据库全量的访问数据,我们能够做很多事情,包括:

资源归属:我们可以很清楚地掌握哪些库、表、用户等是不是还在使用,或者被谁使用了,我们从此在 MySQL 运维中,做到了:“冤有头,债有主”。SQL 统计报表:我们通过对 SQL 语句的访问记录,可以很清楚地了解到哪些SQL语句是高频访问的,哪些更新语句的影响行数大,哪些语句中有对大字段的操作,并且长度大于某个预定义的值的,都是可以很好的被抓出来的。资源优化、业务变化分析:可以对不访问的库表,用户进行回收,从而节省资源。安全审计:很明显,有了这样的利器之后,安全无忧。调用栈打通:通过抓包分析,我们很容易确定某一个 IP 访问了什么数据库,什么表,使用的是什么语句,这样不管是从上到下查问题,还是从下到上查问题,都能很容易地定位,而不是像之前,一走到数据库这里,就断线了。通知:以后如果要迁移一个数据库,或者做一个主动运维的工作,我们都可以很轻松地找到被影响的业务直接到负责人,简单高效。责任归属:当发现一些由于语句写的不好影响数据库的故障,我们很容易可以定位是什么业务导致的,甚至可以提前预警一些不好的语句,直接找到语句的 Owner ,然后提前解决。其它:还有很多可以做的功能,这里就不一一列举了。

很明显,有了我们这些工作,运维不再是难事儿。很多问题,原来是被动接受的,现在就有可能变为了主动“反击”。这也是我们团队现在推崇的被叫做是“基于统计的数据库运维”的基础,基于统计,也就是基于数据,基于数据,也可以被理解为智能,等到我们把所有的运维的这些事儿都掌握了之后,了然于胸之后,我们就可以很自信的说,我们已经是 AIDBA 了。到那时,我们不会再为开发的提问而不知所措,也不再为谁动了数据发愁,我们能够精准定位到每一刻数据的变更,防患于未然,保护好我们的数据,更主要的是,我们还可以让数据库资源,变得更加健康,DBA 的工作,变得更加高效。

标签: #mysql端口查询 #mysql数据库审计功能怎么开