龙空技术网

Python K线图,均线,MACD,高低点图

Pgabc 271

前言:

现时咱们对“pythonzbar”大体比较重视,咱们都想要了解一些“pythonzbar”的相关文章。那么小编同时在网摘上收集了一些对于“pythonzbar””的相关内容,希望小伙伴们能喜欢,大家一起来了解一下吧!

"""Python  K线图,均线,MACD,高低点图"""# -*- coding:utf-8 -*-import timeimport pandas as pdfrom pyecharts.charts import Kline, Line, Bar, Gridfrom pyecharts import options as optsfrom pyecharts.commons.utils import JsCodefrom typing import List, Sequence, Union"""建立mysql,postgresql等数据库链接"""engine_mysql = create_engine('mysql+pymysql://用户名:密码@端口:/数据库名?charset=utf8')engine_pg = create_engine("postgresql+psycopg2://用户名:,密码@端口:/数据库名", client_encoding='utf8')def make_highest_lowest_date(df, engine_wm, ma_1=5, ma_2=26):    start_dt = df.trade_date.iloc[0]    end_dt = df.trade_date.iloc[-1]    db_nm = 'trade_cal'  # 生成交易日历    query = "select cal_date from {0} where cal_date >= '{1}' and  cal_date <= '{2}' " \            "and is_open = '1';".format(db_nm, start_dt, end_dt)    # print(query)    df_cal = pd.read_sql(query, con=engine_wm)  # 取单数据    # print('trade_cal is ok , num is  :',len(df_cal))    df_cal.drop_duplicates(subset='cal_date', keep='first', inplace=True)  # 去重    df_cal = df_cal.sort_values(by='cal_date', axis=0, ascending=True)  # 排序 升序    df_cal = df_cal.reset_index(drop=True)  # 重新索引并排序    cal_list = df_cal.cal_date.tolist()    df_cal_list = df.trade_date.tolist()    df_wm = df    df_cal_list = df.trade_date.tolist()    # 设置索引    df_wm = df_wm.set_index("trade_date", drop="False")    # 将df的索引设置为日期索引    df_wm = df_wm.set_index(pd.to_datetime(df_wm.index))    # 生成完整的日期序列    pd_cal = pd.to_datetime(cal_list)  # 生成 trade_cal 日期的datetimeindex格式    # 填充缺失索引,并填充默认值    df_wm = df_wm.reindex(pd_cal)    zvalues = df_wm.loc[~(df_wm.vol > 0)].loc[:, ['vol', 'amount']]    df_wm.update(zvalues.fillna(0))    df_wm.fillna(method='ffill', inplace=True)    df_wm.head()    df_dd = df_wm.reset_index()  # 将索引变回    df_dd = df_dd.rename(columns={'index': 'trade_date'})  # 改回列名    df_nv = df_dd    item_variable = 'trade_date'  # 将日期格式变成字符串    df_nv[item_variable] = df_nv[item_variable].astype(str)  # 转换数据类型    df_nv['ma_1'] = df_nv.close.rolling(ma_1).mean()  # ma_1日收盘价均值    df_nv['ma_2'] = df_nv.close.rolling(ma_2).mean()  # ma_2日收盘价均值    # 生成境均线上下串日期列表    point_wm = []  # 时间点    date_pool = set(df_nv.trade_date.to_list())    for i in range(len(df_nv)):        if i == 0:            point_wm.append(df_nv.iloc[i].trade_date)        elif i != len(df_nv) - 1:            for_day = df_nv.iloc[i - 1].close            to_day = df_nv.iloc[i].close            next_day = df_nv.iloc[i + 1].close            for_day5 = df_nv.iloc[i - 1].ma_1            to_day5 = df_nv.iloc[i].ma_1            next_day5 = df_nv.iloc[i + 1].ma_1            for_day26 = df_nv.iloc[i - 1].ma_2            to_day26 = df_nv.iloc[i].ma_2            next_day26 = df_nv.iloc[i + 1].ma_2            if for_day5 > for_day26 and next_day5 < next_day26:                if point_wm[-1] != df_nv.iloc[i - 1].trade_date:  # 避免上下交易日同时取值                    point_wm.append(df_nv.iloc[i].trade_date)                    pass            elif for_day5 < for_day26 and next_day5 > next_day26:                if point_wm[-1] != df_nv.iloc[i - 1].trade_date:  # 避免上下交易日同时取值                    point_wm.append(df_nv.iloc[i].trade_date)                    pass        else:            point_wm.append(df_nv.iloc[i].trade_date)    # 计算高低点的日期    h_l_dt = []    for i in range(len(point_wm)):        try:            if i == len(point_wm) - 1:                pass            else:                tday = point_wm[i]                nday = point_wm[i + 1]                # print(tday,nday)                df_wm = df_nv[(df_nv['trade_date'] >= tday) & (df_nv['trade_date'] <= nday)]                df_wm = df_wm.reset_index()  # 重建索引号                # print(df_wm)                if df_wm.iloc[1].ma_1 >= df_wm.iloc[1].ma_2:                    # m = df_wm.high.idxmax(1)                    m = df_wm.high.idxmax()                    # print('highest is :',m,df_wm.iloc[m].trade_date)                    h_l_dt.append(df_wm.iloc[m].trade_date)                    pass                else:                    n = df_wm.low.idxmin()                    # print('lowest is :',n,df_wm.iloc[n].trade_date)                    h_l_dt.append(df_wm.iloc[n].trade_date)                    pass        except Exception as err:            print('pgdtime.make_highest_lowest_date is err.', err)    return h_l_dtdef data_astype(df, item_list, item5='float'):    #格式化数据    try:        for item_nv in item_list:            # print(item_nv)            item_variable = item_nv  # 设定转换类型的变更            df[item_variable] = df[item_variable].astype(item5)  # 转换数据类型        return df    except Exception as err:        print(err)def draw_k(db_nm, ts_code, start_dt, end_dt, engine_wm, html_wm):    time_s = time.time()    def macd(df_wm,period1 = 12,period2 = 26,period3=9):       data = df_wm       data.drop_duplicates(subset=['ts_code','trade_date'],keep='first',inplace=True)  #去重       data = data.reset_index()   #重新排       item_variable = 'close'   #设定转换类型的变更       data[item_variable] = data[item_variable].astype('float')  #转换数据类型       data['dif']=data['close'].ewm(adjust=False,alpha=2/(period1+1),ignore_na=True).mean()-data['close'].ewm(adjust=False,alpha=2/(period2+1),ignore_na=True).mean()       data['dea']=data['dif'].ewm(adjust=False,alpha=2/(period3+1),ignore_na=True).mean()       data['macd']=2*(data['dif']-data['dea'])       return data    def calculate_ma(day_count: int):        result: List[Union[float, str]] = []        for i in range(len(data["times"])):            if i < day_count:                result.append("-")                continue            sum_total = 0.0            for j in range(day_count):                sum_total += float(data["datas"][i - j][1])            result.append(abs(float("%.2f" % (sum_total / day_count))))        return result    # 数据    def split_data(origin_data) -> dict:        datas = []        times = []        vols = []        macds = []        difs = []        deas = []        for i in range(len(origin_data)):            datas.append(origin_data[i][1:])            times.append(origin_data[i][0:1][0])            vols.append(origin_data[i][5])            macds.append(origin_data[i][7])            difs.append(origin_data[i][8])            deas.append(origin_data[i][9])        vols = [int(v) for v in vols]        return {            "datas": datas,            "times": times,            "vols": vols,            "macds": macds,            "difs": difs,            "deas": deas,        }    def split_data_part() -> Sequence:        mark_line_data = []        idx = 0        tag = 0        vols = 0        for i in range(len(data["times"])):            if data["datas"][i][5] != 0 and tag == 0:                idx = i                vols = data["datas"][i][4]                tag = 1            if tag == 1:                vols += data["datas"][i][4]            if data["datas"][i][5] != 0 or tag == 1:                mark_line_data.append(                    [                        {                            "xAxis": idx,                            "yAxis": float("%.2f" % data["datas"][idx][3])                            if data["datas"][idx][1] > data["datas"][idx][0]                            else float("%.2f" % data["datas"][idx][2]),                            "value": vols,                        },                        {                            "xAxis": i,                            "yAxis": float("%.2f" % data["datas"][i][3])                            if data["datas"][i][1] > data["datas"][i][0]                            else float("%.2f" % data["datas"][i][2]),                        },                    ]                )                idx = i                vols = data["datas"][i][4]                tag = 2            if tag == 2:                vols += data["datas"][i][4]            #"value": str(float("%.2f" % (vols / (i - idx + 1)))) + " M"            if data["datas"][i][5] != 0 and tag == 2:                mark_line_data.append(                    [                        {                            "xAxis": idx,                            "yAxis": float("%.2f" % data["datas"][idx][3])                            if data["datas"][i][1] > data["datas"][i][0]                            else float("%.2f" % data["datas"][i][2]),                            "value": "",                        },                        {                            "xAxis": i,                            "yAxis": float("%.2f" % data["datas"][i][3])                            if data["datas"][i][1] > data["datas"][i][0]                            else float("%.2f" % data["datas"][i][2]),                        },                    ]                )                idx = i                vols = data["datas"][i][4]        #return mark_line_data        return []    def draw_chart(html_wm):        kline = (            Kline()            .add_xaxis(xaxis_data=data["times"])            .add_yaxis(                series_name="",                y_axis=data["datas"],                itemstyle_opts=opts.ItemStyleOpts(                    color="#ef232a",                    color0="#14b143",                    border_color="#ef232a",                    border_color0="#14b143",                ),                markpoint_opts=opts.MarkPointOpts(                    data=[                        opts.MarkPointItem(type_="max", name="最大值"),                        opts.MarkPointItem(type_="min", name="最小值"),                    ]                ),                markline_opts=opts.MarkLineOpts(                    label_opts=opts.LabelOpts(                        position="middle", color="blue", font_size=15                    ),                    data=split_data_part(),                    symbol=["circle", "none"],                ),            )            .set_series_opts(                markarea_opts=opts.MarkAreaOpts(is_silent=True, data=split_data_part())            )            .set_global_opts(                title_opts=opts.TitleOpts(title="{}_K线高低点图表".format(ts_code), pos_left="0"),                xaxis_opts=opts.AxisOpts(                    type_="category",                    is_scale=False,                    boundary_gap=False,                    axisline_opts=opts.AxisLineOpts(is_on_zero=False),                    splitline_opts=opts.SplitLineOpts(is_show=False),                    split_number=20,                    min_="dataMin",                    max_="dataMax",                ),                yaxis_opts=opts.AxisOpts(                    is_scale=True, splitline_opts=opts.SplitLineOpts(is_show=False)                ),                tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="line"),                datazoom_opts=[                    opts.DataZoomOpts(                        is_show=False, type_="inside", xaxis_index=[0, 0], range_end=100                    ),                    opts.DataZoomOpts(                        is_show=False, xaxis_index=[0, 1], pos_top="97%", range_end=100                    ),                    opts.DataZoomOpts(is_show=False, xaxis_index=[0, 2], range_end=100),                ],            )        )        kline_line = (            Line()            .add_xaxis(xaxis_data=data["times"])            .add_yaxis(                series_name="MA5",                y_axis=calculate_ma(day_count=5),                is_smooth=True,                linestyle_opts=opts.LineStyleOpts(opacity=1),                label_opts=opts.LabelOpts(is_show=False),            )            .add_yaxis(                series_name="MA10",                y_axis=calculate_ma(day_count=10),                is_smooth=True,                linestyle_opts=opts.LineStyleOpts(opacity=0.5),                label_opts=opts.LabelOpts(is_show=False),            )            .add_yaxis(                series_name="MA20",                y_axis=calculate_ma(day_count=20),                is_smooth=True,                linestyle_opts=opts.LineStyleOpts(opacity=0.5),                label_opts=opts.LabelOpts(is_show=False),            )            .add_yaxis(                series_name="MA60",                y_axis=calculate_ma(day_count=60),                is_smooth=True,                linestyle_opts=opts.LineStyleOpts(opacity=0.5),                label_opts=opts.LabelOpts(is_show=False),            )            .set_global_opts(                xaxis_opts=opts.AxisOpts(                    type_="category",                    grid_index=1,                    axislabel_opts=opts.LabelOpts(is_show=False),                ),                yaxis_opts=opts.AxisOpts(                    grid_index=1,                    split_number=3,                    axisline_opts=opts.AxisLineOpts(is_on_zero=False),                    axistick_opts=opts.AxisTickOpts(is_show=False),                    splitline_opts=opts.SplitLineOpts(is_show=False),                    axislabel_opts=opts.LabelOpts(is_show=False),                ),            )        )        # Overlap Kline + Line        overlap_kline_line = kline.overlap(kline_line)        # Bar-1        bar_1 = (            Bar()            .add_xaxis(xaxis_data=data["times"])            .add_yaxis(                series_name="Volumn",                y_axis=data["vols"],                xaxis_index=1,                yaxis_index=1,                label_opts=opts.LabelOpts(is_show=False),                itemstyle_opts=opts.ItemStyleOpts(                    color=JsCode(                        """                    function(params) {                        var colorList;                        if (barData[params.dataIndex][1] > barData[params.dataIndex][0]) {                            colorList = '#ef232a';                        } else {                            colorList = '#14b143';                        }                        return colorList;                    }                    """                    )                ),            )            .set_global_opts(                xaxis_opts=opts.AxisOpts(                    type_="category",                    grid_index=1,                    axislabel_opts=opts.LabelOpts(is_show=False),                ),                legend_opts=opts.LegendOpts(is_show=False),            )        )        # Bar-2 (Overlap Bar + Line)        bar_2 = (            Bar()            .add_xaxis(xaxis_data=data["times"])            .add_yaxis(                series_name="MACD",                y_axis=data["macds"],                xaxis_index=2,                yaxis_index=2,                label_opts=opts.LabelOpts(is_show=False),                itemstyle_opts=opts.ItemStyleOpts(                    color=JsCode(                        """                            function(params) {                                var colorList;                                if (params.data >= 0) {                                  colorList = '#ef232a';                                } else {                                  colorList = '#14b143';                                }                                return colorList;                            }                            """                    )                ),            )            .set_global_opts(                xaxis_opts=opts.AxisOpts(                    type_="category",                    grid_index=2,                    axislabel_opts=opts.LabelOpts(is_show=False),                ),                yaxis_opts=opts.AxisOpts(                    grid_index=2,                    split_number=4,                    axisline_opts=opts.AxisLineOpts(is_on_zero=False),                    axistick_opts=opts.AxisTickOpts(is_show=False),                    splitline_opts=opts.SplitLineOpts(is_show=False),                    axislabel_opts=opts.LabelOpts(is_show=False),                ),                legend_opts=opts.LegendOpts(is_show=False),            )        )        line_2 = (            Line()            .add_xaxis(xaxis_data=data["times"])            .add_yaxis(                series_name="DIF",                y_axis=data["difs"],                xaxis_index=2,                yaxis_index=2,                label_opts=opts.LabelOpts(is_show=False),            )            .add_yaxis(                series_name="DIF",                y_axis=data["deas"],                xaxis_index=2,                yaxis_index=2,                label_opts=opts.LabelOpts(is_show=False),            )            .set_global_opts(legend_opts=opts.LegendOpts(is_show=False))        )        overlap_bar_line = bar_2.overlap(line_2)        grid_chart = Grid(init_opts=opts.InitOpts(width="1400px", height="800px"))        grid_chart.add_js_funcs("var barData = {}".format(data["datas"]))        grid_chart.add(            overlap_kline_line,            grid_opts=opts.GridOpts(pos_left="3%", pos_right="1%", height="60%"),        )        grid_chart.add(            bar_1,            grid_opts=opts.GridOpts(                pos_left="3%", pos_right="1%", pos_top="71%", height="10%"            ),        )        grid_chart.add(            overlap_bar_line,            grid_opts=opts.GridOpts(                pos_left="3%", pos_right="1%", pos_top="82%", height="14%"            ),        )        #grid_chart.render("/home/test/picture/pye_k46.html")        grid_chart.render(html_wm)    query = "select ts_code,trade_date,open,close,low,high,vol,amount from {0} where trade_date >= '{1}' " \            "and trade_date <= '{2}' and ts_code = '{3}';".format(db_nm, start_dt, end_dt, ts_code)    print(query)    df1 = pd.read_sql(query,con=engine_wm)    #取单数据    print('df is ok ,first num:',len(df1))    df1.drop_duplicates(subset=['trade_date','ts_code'], keep='first', inplace=True)  # 去重    df3 = macd(df1)    #格式化数据    item_list = ['open', 'close', 'low', 'high', 'vol', 'macd', 'dif', 'dea']    df3 = data_astype(df3, item_list, item5='float')    df5 = df3    df3 = df3.sort_values(by='trade_date', axis=0, ascending=True)                             #排序 升序    df3 = df3.reset_index(drop=True)    df8 = df3    #取单数据    print('df is ok ,first num:', len(df8))    df8.drop_duplicates(subset=['trade_date', 'ts_code'], keep='first', inplace=True)  # 去重    df8 = df8.sort_values(by='trade_date', axis=0, ascending=True)    #print('df8:',df8)    item_list = ['open', 'close', 'low', 'high', 'vol']    df8 = data_astype(df8, item_list, item5='float')    #排序 升序    h_l_dt = make_highest_lowest_date(df8, engine_wm, ma_1=5, ma_2=26)     #生成高低点日期    h_l = []    for i in range(len(h_l_dt)):        h_l_sim = h_l_dt[i][:4]+h_l_dt[i][5:7]+h_l_dt[i][-2:]        h_l.append(h_l_sim)    def h_l_m(x):        if x in h_l:            return 1        else:            return 0    df3['k_form'] = df3.apply(lambda x:h_l_m(x['trade_date']),axis=1)    df2 = df3[['trade_date', 'open', 'close', 'low', 'high', 'vol', 'k_form', 'macd', 'dif', 'dea']]    df2 = df2.sort_values(by='trade_date', ascending=True)    #排序    #格式化数据    item_list = ['open', 'close', 'low', 'high', 'vol', 'macd', 'dif', 'dea']    df2 = data_astype(df2, item_list, item5='float')    echarts_data = df2.values.tolist()    data = split_data(origin_data=echarts_data)    draw_chart(html_wm)if __name__ == '__main__':    start_dt = '20220101'    end_dt = '20221024'    db_nm = 'stock_index_daily'  # 数据库来源    ts_code = '399300.SZ'    html_wm = "/home/test/picture/pye_k66.html"    draw_k(db_nm, ts_code, start_dt, end_dt, engine_pg, html_wm)    """    运行结果:    df is ok ,first num: 471    df is ok ,first num: 193        Process finished with exit code 0    """    """    Pgabc 2022000011    author : Pgabc        """

标签: #pythonzbar