龙空技术网

性能超 Python 30倍 | 676个股票衍生特征计算的流批一体实现方案

DolphinDB 6200

前言:

现时大家对“流python”都比较珍视,看官们都想要分析一些“流python”的相关文章。那么小编也在网络上搜集了一些有关“流python””的相关内容,希望我们能喜欢,小伙伴们快快来了解一下吧!

随着算力的提升和机器学习与深度学习的普及,在进行数据建模时往往会采用批量生成衍生特征的方法来丰富数据集的特征,如:对原有的 10 维特征都采用 max, min, avg, std, sum 这五种方式进行聚合,数据的特征将变为 50 维。

类似的衍生特征工程极大地提升了模型的准确性。在 Python 中,调用 Pandas 的 agg 函数,传入一个字典(key:列名,value:衍生特征的函数列表)即可实现这样的衍生特征计算。本教程将通过DolphinDB的元编程 ,以极少的代码量实现与 pandas 中类似的衍生特征计算方法。

我们使用 2021 年 16 支股票的 Level 2 快照数据,构建频率为 10 分钟的特征,并利用元编程,低代码量实现了 676 列衍生特征的计算。与多个 Python Pandas 进程并行计算相比,DolphinDB 能带来约 30 倍的性能提升。教程还展示了如何在生产环境中用流式计算进行输入特征的实时计算。

本教程示例代码必须在 2.00.6 及以上版本的 DolphinDB server上运行。

1. Snapshot 数据文件结构

本教程应用的数据源为上交所 level2 快照数据(Snapshot),每幅快照间隔时间为 3 秒或 5 秒,数据文件结构如下:

字段

含义

字段

含义

字段

含义

SecurityID

证券代码

LowPx

最低价

BidPrice[10]

申买十价

DateTime

日期时间

LastPx

最新价

BidOrderQty[10]

申买十量

PreClosePx

昨收价

TotalVolumeTrade

成交总量

OfferPrice[10]

申卖十价

OpenPx

开始价

TotalValueTrade

成交总金额

OfferOrderQty[10]

申卖十量

HighPx

最高价

InstrumentStatus

交易状态

……

……

2021 年上交所所有股票的快照数据已经提前导入至 DolphinDB 数据库中,一共约17.07亿条数据,导入方法见国内股票行情数据导入实例。

2. 教程开发背景

本教程受 Kaggle 的 Optiver Realized Volatility Prediction 竞赛项目启发,该项目排名第一的代码中使用了两档的买卖量价数据及 pandas 的元编程实现了批量衍生特征的计算,本教程在此基础上,使用真实十档买卖量价的快照数据实现了 676 列衍生特征的计算。

2.1 一级指标

一级指标全部由快照数据中的 10 档买卖单量价数据计算而来

Weighted Averaged Price(wap):加权平均价格Price Spread(priceSpread):用于衡量买单价和卖单价的价差Bid Price Spread(bidSpread):用于衡量买 1 价和买 2 价的价差Offer Price Spread(offerSpread):用于衡量卖1价和卖2价的价差Total Volume(totalVolume):10 档买卖单的总量Volume Imbalance(volumeImbalance):买卖单总量不平衡Log Return of Offer Price(logReturnOffer):卖单价的对数收益Log Return of Bid Price(logReturnBid):买单价的对数收益2.2 二级指标

二级指标全部由一级指标计算生成

WAP Balance(wapBalance):加权平均价格平衡Log Return of WAP(logReturnWap):加权平均价格对数收益2.3 衍生特征

衍生特征是对一级指标和二级指标做 10 分钟降采样的衍生,降采样的聚合方法通过元编程来批量实现,衍生方法如下:

指标名称

衍生方法

DateTime

count

Wap[10]

sum, mean, std

LogReturn[10]

sum, realizedVolatility, mean, std

LogReturnOffer[10]

sum, realizedVolatility, mean, std

LogReturnBid[10]

sum, realizedVolatility, mean, std

WapBalance

sum, mean, std

PriceSpread

sum, mean, std

BidSpread

sum, mean, std

OfferSpread

sum, mean, std

TotalVolume

sum, mean, std

VolumeImbalance

sum, mean, std

最终预测的计算指标为实际波动率:

Realized Volatility(realizedVolatility):实际波动率

同时考虑到,过去 10 分钟的特征中,特征的时效性是随时间增加的,越靠前的特征时效性越弱,越靠后的特征时效性越强。所以,本教程对 10 分钟的特征切分成了 0-600s(全部),150-600s,300-600s,450-600s 四段,分别进行上述衍生指标的计算。

10 分钟的快照数据最终形成 676 维的聚合特征,如下图所示。

3. DolphinDB 元编程计算代码开发

本教程中的重点和难点是批量生成大量特征列计算表达式,如果按照传统 SQL 来编程,实现 676 个计算指标需要编写大量代码,因此本教程使用元编程方法实现衍生特征的计算,关于元编程的详情请参考 元编程 — DolphinDB 2.0 文档 。本教程通过元编程函数 sql 生成元代码。为了对快照数据做 10min 的聚合计算,sql 函数的分组参数 groupby=[<SecurityID>, <bar(DateTime, 10m) as DateTime>]。在自定义聚合函数中,首先进行一二级指标计算,再进行衍生特征计算。通过 DolphinDB 元编程对 level2 快照数据完成 676 列衍生特征的完整计算代码如下:

DolphinDB 批计算代码:十档量价数据用多列存储。DolphinDB 批计算代码(数组向量版):十档量价数据用数组向量存储。3.1 一二级指标计算

自定义聚合函数的入参是某支股票的 BidPrice, BidOrderQty, OfferPrice, OfferOrderQty 这四个十档量价数据的矩阵,在 DolphinDB 中对一二级指标的计算代码如下:

 wap = (BidPrice * OfferOrderQty + BidOrderQty * OfferPrice) \ (BidOrderQty + OfferOrderQty) wapBalance = abs(wap[0] - wap[1]) priceSpread = (OfferPrice[0] - BidPrice[0]) \ ((OfferPrice[0] + BidPrice[0]) \ 2) BidSpread = BidPrice[0] - BidPrice[1] OfferSpread = OfferPrice[0] - OfferPrice[1] totalVolume = OfferOrderQty.rowSum() + BidOrderQty.rowSum() volumeImbalance = abs(OfferOrderQty.rowSum() - BidOrderQty.rowSum()) LogReturnWap = logReturn(wap) LogReturnOffer = logReturn(OfferPrice) LogReturnBid = logReturn(BidPrice)
3.2 衍生特征计算

利用 Python 的 pandas 库,通过向 groupby.agg 传入一个字典(字典的 key 为列名,value为聚合函数列表),即可实现对指定的列进行批量的聚合指标计算。

在 DolphinDB 中,亦可通过自定义函数实现类似的需求,即把字典转换成元编程代码,具体代码如下:

 def createAggMetaCode(aggDict){     metaCode = []     metaCodeColName = []     for(colName in aggDict.keys()){         for(funcName in aggDict[colName])         {             metaCode.append!(sqlCol(colName, funcByName(funcName), colName + `_ + funcName$STRING))             metaCodeColName.append!(colName + `_ + funcName$STRING)         }     }     return metaCode, metaCodeColName$STRING }  features = {     "DateTime":[`count] } for( i in 0..9) {     features["Wap"+i] = [`sum, `mean, `std]     features["LogReturn"+i] = [`sum, `realizedVolatility, `mean, `std]     features["LogReturnOffer"+i] = [`sum, `realizedVolatility, `mean, `std]     features["LogReturnBid"+i] = [`sum, `realizedVolatility, `mean, `std] } features["WapBalance"] = [`sum, `mean, `std] features["PriceSpread"] = [`sum, `mean, `std] features["BidSpread"] = [`sum, `mean, `std] features["OfferSpread"] = [`sum, `mean, `std] features["TotalVolume"] = [`sum, `mean, `std] features["VolumeImbalance"] = [`sum, `mean, `std] aggMetaCode, metaCodeColName = createAggMetaCode(features)

返回结果为元代码向量和对应的元代码列名,如下图所示:

在自定义函数中,为了方便后续使用元编程进行衍生特征计算,需要将计算的一二级指标拼接成一个 table,同时修改列名,具体代码如下:

subTable = table(DateTime as `DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, wap, wapBalance, priceSpread, BidSpread, OfferSpread, totalVolume, volumeImbalance, LogReturnWap, LogReturnOffer, LogReturnBid) colNum = 0..9$STRING colName = `DateTime <- (`BidPrice + colNum) <- (`BidOrderQty + colNum) <- (`OfferPrice + colNum) <- (`OfferOrderQty + colNum) <- (`Wap + colNum) <- `WapBalance`PriceSpread`BidSpread`OfferSpread`TotalVolume`VolumeImbalance <- (`LogReturn + colNum) <- (`LogReturnOffer + colNum) <- (`LogReturnBid + colNum) subTable.rename!(colName)

其中 “<-” 是 DolphinDB 函数 join 的简写符号,此处用于将各字段拼接成列向量。

最后将元代码作为参数传入自定义聚合函数,配合一二级指标拼接而成的 table 进行 676 列衍生指标的计算,并以 676 列的形式作为聚合结果返回,具体代码如下:

 subTable['BarDateTime'] = bar(subTable['DateTime'], 10m) result = sql(select = aggMetaCode, from = subTable).eval().matrix() result150 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 150*1000) >).eval().matrix() result300 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 300*1000) >).eval().matrix() result450 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 450*1000) >).eval().matrix() return concatMatrix([result, result150, result300, result450])

部分计算结果展示

所有衍生特征列名展示

4. DolphinDB vs Python测试数据为实盘16只证券标的的十档快照数据,总记录数为 19,220,237。计算逻辑为按股票代码分组,计算 676 个 10 分钟聚合指标。DolphinDB 和 Python 的 CPU 计算调用资源都是 8 核。DolphinDB 批计算代码:十档量价数据用多列存储。DolphinDB 批计算代码(数组向量版):十档量价数据用数组向量存储。Python 批计算代码。

由于 DolphinDB 的计算是分布式并行计算,本教程中配置的并行度为 8,因此在使用 Python 实现衍生特征计算时,也采用了 8 的并行度进行并行计算,即同时调用 8 个 Python 进程进行计算。

计算性能对比结果

股票数量

交易日

数据量

Python(s)

DolphinDB(s)

16

243

19,220,237

3,039

100

5. 模型构建

选取 16 支股票 (601318,600519,600036,600276,601166,600030,600887,600016,601328,601288,600000,600585,601398,600031,601668,600048) 2022年 09:30:00-11:30:00 和 13:00:00-15:00:00 的 level2 快照进行模型训练。

DolphinDB 支持一系列常用的机器学习算法,例如最小二乘回归、随机森林、K-平均等,使用户能够方便地完成回归、分类、聚类等任务。除了内置的经典的机器学习算法函数,DolphinDB 还支持许多第三方库,因此我们也可以调用 DolphinDB 提供的第三方库插件来进行模型训练。

XGBOOST(Extreme Gradient Boosting)是一种 Tree Boosting 的可扩展机器学习系统,它在 Gradient Boosting 框架下实现机器学习算法,提供了并行树提升(也称为 GBDT,GBM),可以快速准确地解决许多数据科学的问题。

本教程参考机器学习教程-5.使用 DolphinDB 插件进行机器学习,选取 XGBOOST 进行训练。

参考:XGBOOST 插件安装教程

模型评价指标:根均方百分比误差(Root Mean Square Percentage Error, RMSPE)

参考:模型构建和训练代码

5.1 数据处理

删除掉含 NULL 的记录,标注 label 并将构造出来的 676 维特征适当调整为 XGBOOST 输入的数据格式,具体代码如下:

//将计算出来的特征中包含 NULL 的记录删除 result = result[each(isValid, result.values()).rowAnd()] ​ result_input = copy(result) ​ //选取 LogReturn0_realizedVolatility 作为label label = result[`LogReturn0_realizedVolatility] ​ //将 SYMBOL 型的 SecurityID 列转换为 XGBOOST 模型支持输入的 INT 型 result_input.update!(`SecurityID_int, int(result[`SecurityID])) ​ //调整输入的字段,去除不需要的列 result_input.dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)

注意:本次预测值为未来 10 分钟的波动率,WAP_0 最接近股价,所以选取LogReturn0_realizedVolatility 作为 label

5.2 划分训练集和测试集

本项目中没有设置验证集,训练集和测试集按 7:3 比例划分,即 train:test = 62514:26804,具体代码如下:

 def trainTestSplit(x, testRatio) {     xSize = x.size()     testSize =( xSize * (1-testRatio))$INT     return x[0: testSize], x[testSize:xSize] } ​ Train_x, Test_x = trainTestSplit(result_input, 0.3) Train_y, Test_y = trainTestSplit(label, 0.3)
5.3 训练及评价

DolphinDB 的 XGBOOST 插件中包含 4 个用户接口:

用于训练模型的 xgboost::train() ;用于预测的 xgboost::predict;用于保存模型到磁盘的 xgboost::saveModel用于从磁盘上加载模型的 xgboost::loadModel

具体使用方法的说明参见 DolphinDB XGBoost 插件用户接口教程。

具体代码如下:

 //定义评估指标 RMSPE(Root Mean Square Percentage Error) def RMSPE(a,b) {     return sqrt( sum( ((a-b)\a)*((a-b)\a) ) \a.size()  ) } ​ //定义模型训练的参数 params = {     objective: 'reg:squarederror',     colsample_bytree: 0.8,     subsample: 0.8,     min_child_weight: 1,     max_leaves:128,     eta: 0.1,     max_depth:10,     eval_metric : 'rmse'     } ​ //XGBOOST 模型训练 model_1 = xgboost::train(Train_y ,Train_x, params, 500) ​ //用测试集预测波动率,并计算 RMPSE y_pred = xgboost::predict(model_1, Test_x) print('RMSPE='+RMSPE(Test_y, y_pred))

运行结果:

 RMSPE:0.559 模型训练时间:1m 3s 327ms

本次预测采取手动粗调参,不代表模型以及应用的最优结果。

模型保存及加载:

//保存模型,modelSavePath 为保存模型的路径,需要根据实际环境配置xgboost::saveModel(model_1, modelSavePath)//模型加载,modelSavePath 为模型的路径,需要根据实际环境配置model = xgboost::loadModel(modelSavePath)

回归模型预测性能

数据量(条)

模型预测时间(ms)

1

0.936

10

1.832

100

9.314

1000

49.274

10000

317.656

6. 流计算实现

以上部分的计算都是基于批量历史数据的计算,而在实际生产环境中,数据的来源往往是以“流”的方式,而如何套用上述复杂的衍生特征计算逻辑实现流式计算是业务层面面临的重大难题。

对于这类问题,DolphinDB 内置了多种类型的流计算引擎,以提供简易快捷的低延时解决方案。

DolphinDB 流计算代码,请参考知乎原文附件

6.1 流计算实现架构

流计算处理流程

上图对应本章节整体的流程框架:实时的流数据首先通过 DolphinDB API 注入至snapshotStream表中。然后通过订阅/推送,将快照数据注入至时间序列聚合引擎,进行窗口为10分钟,步长为10分钟的滑动窗口计算,核心代码如下:

定义存储 snapshot 的流数据表 snapshotStream ,特征工程结果表 aggrFeatures10min,以及后续模型预测的结果表 result10min

name = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INTshare streamTable(100000:0, name, type) as snapshotStreamshare streamTable(100000:0 , `DateTime`SecurityID <- metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450"),`TIMESTAMP`SYMBOL <- take(`DOUBLE, 676)) as aggrFeatures10minshare streamTable(100000:0 , `Predicted`SecurityID`DateTime, `FLOAT`SYMBOL`TIMESTAMP) as result10min
注册时间序列聚合计算引擎
metrics=sqlColAlias(<featureEngineering(DateTime,		matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9),		matrix(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9),		matrix(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9),		matrix(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9), aggMetaCode)>, metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450"))createTimeSeriesEngine(name="aggrFeatures10min", windowSize=600000, step=600000, metrics=metrics, dummyTable=snapshotStream, outputTable=aggrFeatures10min, timeColumn=`DateTime, useWindowStartTime=true, keyColumn=`SecurityID)
第一个订阅:订阅snapshotStream流数据表中的实时增量数据
subscribeTable(tableName="snapshotStream", actionName="aggrFeatures10min", offset=-1, handler=getStreamEngine("aggrFeatures10min"), msgAsTable=true, batchSize=2000, throttle=1, hash=0, reconnect=true)
第二个订阅:实时获取 aggrFeatures10min 表中的特征数据,使用已训练好的模型来进行波动率预测,并将最终结果导入到 result10min 表中。
def predictRV(mutable result10min, model, mutable msg){	startTime = now()	temp_table = select SecurityID, DateTime from msg	msg.update!(`SecurityID_int, int(msg[`SecurityID])).dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)	Predicted = xgboost::predict(model , msg)	temp_table_2 = table(Predicted, temp_table)	result10min.append!(temp_table_2)}subscribeTable(tableName="aggrFeatures10min", actionName="predictRV", offset=-1, handler=predictRV{result10min, model}, msgAsTable=true, hash=1, reconnect=true)

上述脚本定义的 metrics 中调用的 featureEngineering 函数与批计算的代码脚本中的定义完全相同,体现了 DolphinDB 流批一体的优势特点。

附件的流计算示例代码,通过历史数据回放的方式,回测实盘波动率预测的结果展示:

6.2 流计算延时统计

本章节统计了股票在时序聚合引擎中的计算延时情况,流计算延时主要由两部分构成:计算聚合特征的耗时模型预测实时波动率的耗时。主要延时为计算聚合特征的耗时。

计算聚合特征耗时:

timer getStreamEngine('aggrFeatures10min').append!(data)
模型预测实时波动率的耗时:
test_x  = select * from aggrFeatures10mintimer{temp_table = select SecurityID, DateTime from test_xtest_x.update!(`SecurityID_int, int(test_x[`SecurityID])).dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)Predicted = xgboost::predict(model , test_x)}
统计结果:

股票数量

10分钟数据量

计算聚合特征耗时

模型预测实时波动率的耗时

总耗时

1

201

22ms

3ms

25ms

10

2011

92ms

3ms

95ms

20

4020

162ms

4ms

168ms

30

6030

257ms

5ms

262ms

40

8040

321ms

6ms

327ms

50

10054

386ms

7ms

393ms

7. 总结

本教程通过使用 DolphinDB 强大的数据处理能力,并结合元编程实现了低代码批量生成多维股票衍生特征的应用场景。与 Python 等传统数据处理方法相比,DolphinDB 依靠数据存储引擎和计算引擎的高度融合,在数据预处理阶段,方便地实现了分布式并行计算,不仅节约了内存资源,而且在使用相同物理计算资源的情况下,提高了约 30 倍的计算效率。

在批计算的基础上,本教程利用 DolphinDB 内置的流计算处理框架,为实际生产环境的类似需求(实时计算衍生特征、实时调用模型预测计算)提供了一套完整高效的解决方案。本教程以上证 16 支股票 level2 快照数据作为订阅的数据源,通过模式真实数据的实时注入,可以在毫秒级完成对每只股票 10 分钟快照数据衍生676 维衍生特征的低延时计算,从而为后续的数据建模提供强大的数据支撑。

附录

注意事项:本教程示例代码必须在2.00.6及以上版本的DolphinDB server上运行。

DolphinDB批计算代码

DolphinDB批计算代码(数组向量版)

模型构建和训练代码

Python批计算代码

DolphinDB流计算代码

level2快照测试数据

附录文件请见知乎原文

开发环境

CPU 类型:Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz逻辑 CPU 总数:8内存:64GBOS:64位 CentOS Linux 7 (Core)磁盘:SSD 盘,最大读写速率为 520MB/sserver 版本:2.00.6server 部署模式:单节点2.00 配置文件:dolphindb.cfg (volumes, persistenceDir, TSDBRedoLogDir 需要根据实际环境磁盘路径修改)单节点部署教程:单节点部署

标签: #流python