龙空技术网

基于 Python 探针完成调用库的数据提取

即将苏醒的Python 395

前言:

此刻你们对“mysql探针”可能比较关怀,你们都想要学习一些“mysql探针”的相关知识。那么小编也在网上网罗了一些对于“mysql探针””的相关文章,希望咱们能喜欢,看官们一起来学习一下吧!

前记

最近在完善公司的监控系统, 发现在项目运行时经常会出现一些运行时的问题, 这些问题往往不是一个子服务引发的问题, 而可能是某个环节出现了问题, 这时候就需要引入APM系统。

在收集APM数据时发现在 Python 生态中针对web框架都有完善的APM中间件用于接口统计与监控, 但是第三方调用库相关的APM实现都比较少(几乎没有), 同时这些库大多数也都没提供一些钩子实现。这就需要自己去封装一些库, 为这些库实现一套调用过程的数据提供逻辑。

本文是以 Pythonaiomysql 库为例,阐述如何基于 Python 的探针完成调用库的调用过程统计与监控的封装。

注: 监控的形式的agent有很多种,如 PrometheusZabbix , GraphiteOpentracing 他们的数据源有很大的不同,但是他们都是基于元数据封装成自己的源数据,然后发送到对应的服务,所以本文只介绍如何提取元数据,剩下的如何发送需要自己按照特定的监控系统去实现。注:这里以aiomysql库来做示例,提取数据的方法应该用统一的dbapi2, 本文只阐述如何简单的实现。

1.简单粗暴的方法--对mysql库进行封装

要统计一个执行过程, 就需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是基于要调用的方法进行封装,在框架调用 MySQL 库和 MySQL 库中间实现一个中间层, 在中间层完成耗时统计,如:

# 伪代码def my_execute(conn, sql, param):    # 针对MySql库的统计封装组件    with MyTracer(conn, sql, param):        # 以下为正常使用MySql库的代码        with conn.cursor as cursor:            cursor.execute(sql, param)            ...

看样子实现起来非常不错, 而且更改非常方便, 但由于是在最顶层的API上进行修改, 其实是非常不灵活的, 同时在 cursor.execute 里会进行一些预操作, 如把sql和param进行拼接, 调用 nextset 清除当前游标的数据等等。我们最后拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的元数据, 如错误码等等.

如果要拿到最直接有用的数据,就只能去改源代码, 然后再调用源代码了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在 Python 也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.

2.Python的探针

Python 中可以通过 sys.meta_path 来实现 import hook 的功能, 当执行 import 相关操作时, 会根据 sys.meta_path 定义的对象对import相关库进行更改.

sys.meta_path 中的对象需要实现一个 find_module 方法, 这个 find_module 方法返回 None 或一个实现了 load_module 方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hook time.sleep 让他在sleep的时候能打印消耗的时间.

github源码存储:

import importlibimport sysfrom functools import wrapsdef func_wrapper(func):    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""    @wraps(func)    def wrapper(*args, **kwargs):        # 记录开始时间        start = time.time()        result = func(*args, **kwargs)        # 统计消耗时间        end = time.time()        print(f"speed time:{end - start}")        return result    return wrapperclass MetaPathFinder:    def find_module(self, fullname, path=None):        # 执行时可以看出来在import哪些模块        print(f'find module:{path}:{fullname}')        return MetaPathLoader()class MetaPathLoader:    def load_module(self, fullname):        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder = sys.meta_path.pop(0)        # 导入 module        module = importlib.import_module(fullname)        if fullname == 'time':            # 替换函数            module.sleep = func_wrapper(module.sleep)        sys.meta_path.insert(0, finder)        return modulesys.meta_path.insert(0, MetaPathFinder())if __name__ == '__main__':    import time    time.sleep(1)# 输出示例:# find module:datetime# find module:time# load module:time# find module:math# find module:_datetime# speed time:1.00073385238647468
3.制作探针模块

了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在 MetaPathFinder.find_module 中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要 cursor.execute , cursor.fetchone , cursor.fetchall , cursor.executemany 这几个主要的操作,所以需要深入cursor的源码: , 看看如何去更改代码, 后者重载哪个函数.

cursor.execute 的源码( cursor.executemanay 也类似), 发现会先调用 self.nextset 的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过 self._query 进行查询:

async def execute(self, query, args=None):    """Executes the given operation    Executes the given operation substituting any markers with    the given parameters.    For example, getting all rows where id is 5:        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))    :param query: ``str`` sql statement    :param args: ``tuple`` or ``list`` of arguments for sql query    :returns: ``int``, number of rows that has been produced of affected    """    conn = self._get_db()    while (await self.nextset()):        pass    if args is not None:        query = query % self._escape_args(args, conn)    await self._query(query)    self._executed = query    if self._echo:        logger.info(query)        logger.info("%r", args)    return self._rowcount

再看 cursor.fetchone 的源码( cursor.fetchall 也类似), 发现其实是从缓存中获取数据, 这些数据在执行 cursor.execute 中就已经获取了:

def fetchone(self):    """Fetch the next row """    self._check_executed()    fut = self._loop.create_future()    if self._rows is None or self._rownumber >= len(self._rows):        fut.set_result(None)        return fut    result = self._rows[self._rownumber]    self._rownumber += 1    fut = self._loop.create_future()    fut.set_result(result)    return fut

综合上面的分析, 我们只要对核心的方法 self._query 进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入 self._queryselfsql 参数, 根据 self 又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了, 按照思路修改后的代码如下

import importlibimport timeimport sysfrom functools import wrapsfrom typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKINGfrom types import ModuleTypeif TYPE_CHECKING:    import aiomysqldef func_wrapper(func: Callable):    @wraps(func)    async def wrapper(*args, **kwargs) -> Any:        start: float = time.time()        func_result: Any = await func(*args, **kwargs)        end: float = time.time()        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql        self: aiomysql.Cursor = args[0]        sql: str = args[1]        # 通过self,我们可以拿到其他的数据        db: str = self._connection.db        user: str = self._connection.user        host: str = self._connection.host        port: str = self._connection.port        execute_result: Tuple[Tuple] = self._rows        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,         # 这里只是打印一部分数据出来        print({            "sql": sql,            "db": db,            "user": user,            "host": host,            "port": port,            "result": execute_result,            "speed time": end - start        })        return func_result    return cast(Callable, wrapper)class MetaPathFinder:    @staticmethod    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:        if fullname == 'aiomysql':            # 只有aiomysql才进行hook            return MetaPathLoader()        else:            return Noneclass MetaPathLoader:    @staticmethod    def load_module(fullname: str):        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder: "MetaPathFinder" = sys.meta_path.pop(0)        # 导入 module        module: ModuleType = importlib.import_module(fullname)        # 针对_query进行hook        module.Cursor._query = func_wrapper(module.Cursor._query)        sys.meta_path.insert(0, finder)        return moduleasync def test_mysql() -> None:    import aiomysql    pool: aiomysql.Pool = await aiomysql.create_pool(        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'    )    async with pool.acquire() as conn:        async with conn.cursor() as cur:            await cur.execute("SELECT 42;")            (r,) = await cur.fetchone()            assert r == 42    pool.close()    await pool.wait_closed()if __name__ == '__main__':    sys.meta_path.insert(0, MetaPathFinder())    import asyncio    asyncio.run(test_mysql())# 输出示例:# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了.

查阅了一翻资料后发现, python 解释器初始化的时候会自动 import PYTHONPATH 下存在的 sitecustomizeusercustomize 模块, 我们只要创建该模块, 并在模块里面写入我们的 替换函数即可。

具体结构如下,也可以参考github存储:

.├── __init__.py├── hook_aiomysql.py├── sitecustomize.py└── test_auto_hook.py

hook_aiomysql.py 是我们制作探针的代码为例子, 而 sitecustomize.py 存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到 sys.meta_path

import sysfrom hook_aiomysql import MetaPathFindersys.meta_path.insert(0, MetaPathFinder())

test_auto_hook.py 则是测试代码:

import asynciofrom hook_aiomysql import test_mysqlasyncio.run(test_mysql())

接下来只要设置 PYTHONPATH 并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好 PYTHONPATH ):

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      (.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
4.直接替换方法

可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是依赖与 sitecustomize.py 文件很难让他抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。在上面介绍 MetaPathLoader 时说到了 sys.module , 在里面通过 sys.modules 来减少重复引入:

class MetaPathLoader:    def load_module(self, fullname):        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder = sys.meta_path.pop(0)        # 导入 module        module = importlib.import_module(fullname)        if fullname == 'time':            # 替换函数            module.sleep = func_wrapper(module.sleep)        sys.meta_path.insert(0, finder)        return module

这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成 引入模块->替换当前模块方法为我们修改的hook方法 。第一次接触这个方法是从opentracing-python-instrumentation:学的, 不过他夹带着其他的封装, 所以我这里进行了简化处理 github代码仓::

import timefrom functools import wrapsfrom typing import Any, Callable, Tuple, castimport aiomysqldef func_wrapper(func: Callable):    """和上面一样的封装函数, 这里简单略过"""    # 判断是否hook过_IS_HOOK: bool = False# 存放原来的_query_query: Callable = aiomysql.Cursor._query# hook函数def install_hook() -> None:    _IS_HOOK = False    if _IS_HOOK:        return    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)    _IS_HOOK = True# 还原到原来的函数方法def reset_hook() -> None:    aiomysql.Cursor._query = _query    _IS_HOOK = False

代码简单明了,接下来跑一跑刚才的测试:

import asyncioimport aiomysqlfrom demo import install_hook, reset_hookasync def test_mysql() -> None:    pool: aiomysql.Pool = await aiomysql.create_pool(        host='127.0.0.1', port=3306, user='root', password='', db='mysql'    )    async with pool.acquire() as conn:        async with conn.cursor() as cur:            await cur.execute("SELECT 42;")            (r,) = await cur.fetchone()            assert r == 42    pool.close()    await pool.wait_closed()print("install hook")install_hook()asyncio.run(test_mysql())print("reset hook")reset_hook()asyncio.run(test_mysql())print("end")

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}reset hookend
5.总结

得益于Python动态语言的特性, 我们可以很容易的为第三方库实现钩子方法,上面说的两种方法中, 第二种方法非常简单, 但在自己项目中最好还是采用第一种方法, 因为 Python 是通过一行一行代码进行扫描执行的, 第二种方法只能放在入口代码中, 并且要在被hook的对象实例化之前执行, 不然就会实现hook失败的现象, 而第一种方法除了麻烦外, 基本上能躲避所有坑。

原文地址

标签: #mysql探针