龙空技术网

最大离散重叠小波变换MODWT和支持向量回归 SVR的金融时间序列预测

哥本哈根诠释2023 407

前言:

现在看官们对“离散小波变换系数”可能比较注意,朋友们都需要学习一些“离散小波变换系数”的相关内容。那么小编在网上搜集了一些关于“离散小波变换系数””的相关文章,希望兄弟们能喜欢,姐妹们一起来学习一下吧!

本例使用的数据链接如下:将数据从 1 分钟间隔转换为 1 天间隔

第一部分,原始时间序列SVR + 滑动窗方法

首先读取数据

prices = pd.read_csv('../Data/AUD-JPY-2003-2014-day.csv',delimiter=";", header=0, encoding='utf-8', parse_dates=['Date'])prices

删除不使用的列

prices.drop(["Open", "High", "Low"],axis = 1, inplace = True)

定义变量

dates = prices['Date'].copy()closing_prices = prices['Close'].copy()#使用 matplotlib 绘制原始时间序列plt.subplots(figsize=(16,4))plt.plot(dates, closing_prices, label='Original series AUD-JPY 2003-2014')plt.legend(loc = 'best')plt.show()

SVR + 滑动窗,定义滑动窗函数

def slideWindow(series, window_lenght = 2):    _X, _Y = [], []    aux_Window =  sliding_window_view(series, window_lenght+1)    # 将第一个“window_lenght”值作为输入 (X),将最后一个值 (window_lenght+1) 作为输出 (Y)    for i in range(len(aux_Window)):        _Y.append(aux_Window[i][-1])        _X.append(aux_Window[i][:-1])        return _X, _Ywindow_lenght = 2#调用滑动窗函数    X, Y = slideWindow(closing_prices,window_lenght)idx_test_date = int(0.75*len(Y)) + window_lenghtdf = pd.DataFrame(columns = ['test_date']) df['test_date'] = prices['Date'].iloc[idx_test_date:]

拆分并绘制测试数据,将数据拆分为训练集(75%)和测试集(25%),shuffle = False 表示并非随机打乱数据

x_train,x_test,y_train,y_test = train_test_split(X, Y, test_size=0.25, random_state=None, shuffle=False)fig, ax = plt.subplots(2,1,figsize=(16,8))ax[0].plot(dates, closing_prices, label='Original')ax[0].plot(df['test_date'], y_test, label='Values to test the model out',color='orange')ax[1].plot(df['test_date'], y_test, label='Values to test the model out',color='orange')ax[0].legend(loc = 'best')ax[1].legend(loc = 'best')plt.show()

定义训练函数并拟合

def evaluateSVR(_x_train,_y_train,_x_test,_y_test, kernel = 'rbf'):        if (kernel == 'rbf'):        clf = svm.SVR(kernel ='rbf', C=1e3, gamma=0.1)    elif (kernel == 'poly'):        clf = svm.SVR(kernel ='poly', C=1e3, degree=2)    else:        clf = svm.SVR(kernel ='linear',C=1e3)    _y_predict = clf.fit(_x_train,_y_train).predict(_x_test)        return _y_predicty_predict = evaluateSVR(x_train,y_train,x_test,y_test)plotValuesWt = y_test.copy()#绘制预测值plt.subplots(figsize=(18, 6))plt.plot(df['test_date'], y_test, label = "Real")plt.plot(df['test_date'], y_predict, label = "Predicted")plt.legend(loc = 'best')plt.show()

第二部分,使用 MODWT 将时间序列分解

使用“sym4”小波,modwt分解为4层(4 个细节系数 (dC) 和 1 个近似系数 (aC))

def applyModwt(_data, type='sym4', _level=3):    _coeff = modwt(_data, type, _level)    return _coefflevel = 4coeff = applyModwt(closing_prices,type='sym4',_level=level)#检查系数,一个 len(close_prices) 列和 5 行的数组print(np.shape(coeff))#画系数图fig, ax =  plt.subplots(len(coeff), 1, figsize=(16, 8))for i in range(len(coeff)):    if i == len(coeff)-1:        ax[i].plot(coeff[i], label = 'cA[%.0f]'%(i))        ax[i].legend(loc = 'best')    else:        ax[i].plot(coeff[i], label = 'cD[%.0f]'%(i))        ax[i].legend(loc = 'best')

重建原始时间序列

#初始化存储数组recwt = np.zeros((np.shape(coeff)[0], np.shape(coeff)[1]))#分配近似系数和细节系数aCdC = coeff.copy()recwt[level:] = coeff[level]#只使用 aC 来重建时间序列dFs = imodwt(recwt,'sym4')#还可以使用所有的系数来重新构建金融序列rFs = imodwt(coeff,'sym4')#绘图比较fig, ax = plt.subplots(4,1,figsize=(16,8))ax[0].plot(dates, closing_prices, label='Original')#使用所有aC和dC系数重建ax[1].plot(dates, rFs, label='Re-constructed (using all coeff)', color = 'green')#仅使用aC系数重建ax[2].plot(dates, dFs, label='Re-constructed (using just aC)', color = 'orange')#原始信号与降噪后的信号ax[3].plot(dates, closing_prices, label='Original')ax[3].plot(dates, dFs, label='Re-constructed (using just aC)', color = 'orange')ax[0].legend(loc = 'best')ax[1].legend(loc = 'best')ax[2].legend(loc = 'best')ax[3].legend(loc = 'best')plt.show()

第三部分,使用 SVR 估计小波系数

new_coeff = []#使用滑动窗口生成 X 和 Y for i in range(len(aCdC)):    index = int(len(aCdC[i])*0.75)#+ window_lenght    X, Y = slideWindow(aCdC[i], window_lenght=5)    #划分数据    x_train,x_test,y_train,y_test = train_test_split(X, Y, test_size=0.25, random_state=None, shuffle=False)    #Evaluating each dC in the SVR function    y_predict = evaluateSVR(x_train,y_train,x_test,y_test)    #存储预测值和训练数据     new_coeff.append(np.concatenate((aCdC[i][:index], y_predict)))    #绘制每个系数的预测值    plt.subplots(figsize=(18, 6))    plt.plot(y_test, label = "Real")    plt.plot(y_predict, label = "Predicted")    plt.legend(loc = 'best')    plt.show()

用预测值绘制新的时间序列

rpFs = imodwt(new_coeff,'sym4')index = int(len(rpFs)*0.75)#+ window_lenghtfig, ax = plt.subplots(3,1,figsize=(16,8))ax[0].plot(df['test_date'], plotValuesWt, label='Original')#使用所有的dC and aC系数重建ax[1].plot(rpFs[index:] ,label='Re-constructed (using all coeff)', color = 'green')#df['test_date'], ax[2].plot(df['test_date'], plotValuesWt, label='Original')ax[2].plot(df['test_date'], rpFs[index:] ,label='Re-constructed (using all coeff)', color = 'green')ax[0].legend(loc = 'best')ax[1].legend(loc = 'best')ax[2].legend(loc = 'best')print('MSE',mean_squared_error(plotValuesWt, rpFs[index:],squared=False))

第四部分,构建预测模型(使用所有系数进行预测

def evaluateModel(svr, X, Y, prediction_days, past_days):    X_ = []    Y_ = []    Y_.append(np.array(Y)[-1])    X_.append(X[-1])    for i in range(prediction_days):        Y_array = np.array([Y_[-1]])        X_array = np.array(X_[-1][-past_days+1:])        X_Y_concat = np.array([np.concatenate((X_array,Y_array))])        X_ = np.concatenate(( X_, X_Y_concat ))        p_value = svr.predict(X_[-1].reshape(1, -1))        Y_ = np.concatenate(( Y_,  p_value))    return Y_def predictValue(past_days = 7, prediction_days = 5, file_Path = 'Data/AUD-JPY-2003-2014-day.csv', dateColName = 'Date',                  closingPColName = 'Close', delimiter = ';'):    #从文件中获取数据    dates, closing_prices = getDatesAndPrices(file_Path, dateColName, closingPColName, delimiter)    #从小波获取系数    coeff = getCoeffFromSeries(closing_prices)        #使用 SVR 估计系数    predictedCoeff = trainModel(coeff, prediction_days ,past_days)    return predictedCoeff, dates, closing_prices    def getDatesAndPrices(filePath, dateColName, closingPColName, _delimiter):    #从 csv 文件中读取数据    #使用 'parse_dates' 将日期字符串转换为可以使用的对象    prices = pd.read_csv(filePath,delimiter=_delimiter, header=0, encoding='utf-8', parse_dates=[dateColName])    # 定义变量    dates = prices[dateColName].copy()    closing_prices = prices[closingPColName].copy()        return dates, closing_pricesdef getCoeffFromSeries(closing_prices):    #调用之前定义的函数    level = 4    coeff = applyModwt(closing_prices,type='sym4',_level=level)    return coeffdef trainModel(coeff, prediction_days, past_days):    new_coeff = []    print('coeff shape: ',np.shape(coeff))    for i in range(len(coeff)):        firstWindowValues = coeff[i][:past_days]        X, Y = slideWindow(coeff[i], past_days)               svr = svm.SVR(kernel ='rbf', C=1e3, gamma=0.1)        svr.fit(X, Y)                predictCoeff = evaluateModel(svr, X, Y, prediction_days, past_days)        newCoeff_concat = np.concatenate((coeff[i][:-1], predictCoeff))        new_coeff.append(newCoeff_concat)    print('NEW coeff shape: ',np.shape(new_coeff))    return new_coeffdaysToPredict = 7predictedCoeff, dates, closing_prices = predictValue(prediction_days = daysToPredict)

接下来准备绘图进行对比

def plotValues(dates, original, predicted, prediction_days):    fig, ax = plt.subplots(3,1,figsize=(16,8))    ax[0].plot(dates, original, label='Original')    #使用dC and aC系数重建    ax[1].plot(predicted ,label='Re-constructed (using all coeff)', color = 'green')    #print(type(dates))    newDates = (addDayToDates(dates, prediction_days))        ax[2].plot(dates, original, label='Original')    ax[2].plot(newDates,predicted ,label='Re-constructed (using all coeff)', color = 'green')        ax[0].legend(loc = 'best')    ax[1].legend(loc = 'best')    ax[2].legend(loc = 'best')def addDayToDates(dates, prediction_days):    _dates = copy.deepcopy(dates)    lastDate = np.array(_dates)[-1]    for i in range (prediction_days+1):        newDate = pd.to_datetime(lastDate) + pd.DateOffset(days=i)        _dates[len(_dates)-1+i] = newDate    return _datesrpFs = imodwt(predictedCoeff,'sym4')plotValues(dates, closing_prices ,rpFs, daysToPredict)

然后,仅使用近似系数进行预测

def readData(past_days = 7, prediction_days = 5, file_Path = '../Data/AUD-JPY-2003-2014-day.csv',                         dateColName = 'Date', closingPColName = 'Close', delimiter = ';'):    #获取数据    dates, closing_prices = getDatesAndPrices(file_Path, dateColName, closingPColName, delimiter)    return dates, closing_pricesdef getApproxCoeffFromSeries(closing_prices):    #调用函数    level = 4    coeff = applyModwt(closing_prices,type='sym4',_level=level)    return coeffdef trainModelApprox(X, Y, past_days):    #完全重建时间序列所需的值    svr = svm.SVR(kernel ='rbf', C=1e3, gamma=0.1)    svr.fit(X, Y)        return svrdaysToPredict = 7past_days = 7level = 4dates, closing_prices = readData(past_days = past_days, prediction_days = daysToPredict)

然后

# 仅获取近似系数和最后的细节系数approxCoeff = getApproxCoeffFromSeries(closing_prices)#初始化存储数组recwt = np.zeros((np.shape(approxCoeff)[0], np.shape(approxCoeff)[1]))#存储系数recwt[(level-1):] = approxCoeff[-2]recwt[level:] = approxCoeff[-1]#只使用 aC 来重建时间序列,相当于给金融时间序列降噪了dFs = imodwt(recwt,'sym4')

使用近似系数训练模型

X, Y = slideWindow(dFs, past_days)svr = trainModelApprox(X, Y, daysToPredict)

执行预测

predictedValues = evaluateModel(svr, X, Y, prediction_days=daysToPredict, past_days=past_days) rpFs = np.concatenate((dFs, predictedValues[1:]))#绘图plotValues(dates, closing_prices ,rpFs, daysToPredict)

基于最大离散重叠小波变换MODWT和支持向量回归 SVR的金融时间序列预测的步骤大致如此,前面基于滑动窗+SVR的金融序列预测还比较好理解,到小波这边可能就难以理解了,实际上还是各种倒腾小波系数,在每分阶层的小波系数上进行预测,最后再综合,小波分析还是有很大的灵活性的,不管使用近似系数进行预测,还是挑选近似系数+几个细节系数进行预测,并没有一个明确的指导方案,还是要靠自己多试几次。

关于最大离散重叠小波,找了几个金融相关的文章,看一下吧

[1]王健.中美股市联动性——基于极大重叠离散小波变换的研究[J].世界经济文汇,2014(02):72-89.

[2]隋新,何建敏,李亮.时变视角下基于MODWT的沪深300指数现货与期货市场间波动溢出效应[J].系统工程,2015,33(01):31-38.

[3]徐梅. 金融波动分析的小波和频域方法研究[D].天津大学,2004.

代码如下

标签: #离散小波变换系数