前言:
现时咱们对“pythonzbar”大体比较重视,咱们都想要了解一些“pythonzbar”的相关文章。那么小编同时在网摘上收集了一些对于“pythonzbar””的相关内容,希望小伙伴们能喜欢,大家一起来了解一下吧!"""Python K线图,均线,MACD,高低点图"""# -*- coding:utf-8 -*-import timeimport pandas as pdfrom pyecharts.charts import Kline, Line, Bar, Gridfrom pyecharts import options as optsfrom pyecharts.commons.utils import JsCodefrom typing import List, Sequence, Union"""建立mysql,postgresql等数据库链接"""engine_mysql = create_engine('mysql+pymysql://用户名:密码@端口:/数据库名?charset=utf8')engine_pg = create_engine("postgresql+psycopg2://用户名:,密码@端口:/数据库名", client_encoding='utf8')def make_highest_lowest_date(df, engine_wm, ma_1=5, ma_2=26): start_dt = df.trade_date.iloc[0] end_dt = df.trade_date.iloc[-1] db_nm = 'trade_cal' # 生成交易日历 query = "select cal_date from {0} where cal_date >= '{1}' and cal_date <= '{2}' " \ "and is_open = '1';".format(db_nm, start_dt, end_dt) # print(query) df_cal = pd.read_sql(query, con=engine_wm) # 取单数据 # print('trade_cal is ok , num is :',len(df_cal)) df_cal.drop_duplicates(subset='cal_date', keep='first', inplace=True) # 去重 df_cal = df_cal.sort_values(by='cal_date', axis=0, ascending=True) # 排序 升序 df_cal = df_cal.reset_index(drop=True) # 重新索引并排序 cal_list = df_cal.cal_date.tolist() df_cal_list = df.trade_date.tolist() df_wm = df df_cal_list = df.trade_date.tolist() # 设置索引 df_wm = df_wm.set_index("trade_date", drop="False") # 将df的索引设置为日期索引 df_wm = df_wm.set_index(pd.to_datetime(df_wm.index)) # 生成完整的日期序列 pd_cal = pd.to_datetime(cal_list) # 生成 trade_cal 日期的datetimeindex格式 # 填充缺失索引,并填充默认值 df_wm = df_wm.reindex(pd_cal) zvalues = df_wm.loc[~(df_wm.vol > 0)].loc[:, ['vol', 'amount']] df_wm.update(zvalues.fillna(0)) df_wm.fillna(method='ffill', inplace=True) df_wm.head() df_dd = df_wm.reset_index() # 将索引变回 df_dd = df_dd.rename(columns={'index': 'trade_date'}) # 改回列名 df_nv = df_dd item_variable = 'trade_date' # 将日期格式变成字符串 df_nv[item_variable] = df_nv[item_variable].astype(str) # 转换数据类型 df_nv['ma_1'] = df_nv.close.rolling(ma_1).mean() # ma_1日收盘价均值 df_nv['ma_2'] = df_nv.close.rolling(ma_2).mean() # ma_2日收盘价均值 # 生成境均线上下串日期列表 point_wm = [] # 时间点 date_pool = set(df_nv.trade_date.to_list()) for i in range(len(df_nv)): if i == 0: point_wm.append(df_nv.iloc[i].trade_date) elif i != len(df_nv) - 1: for_day = df_nv.iloc[i - 1].close to_day = df_nv.iloc[i].close next_day = df_nv.iloc[i + 1].close for_day5 = df_nv.iloc[i - 1].ma_1 to_day5 = df_nv.iloc[i].ma_1 next_day5 = df_nv.iloc[i + 1].ma_1 for_day26 = df_nv.iloc[i - 1].ma_2 to_day26 = df_nv.iloc[i].ma_2 next_day26 = df_nv.iloc[i + 1].ma_2 if for_day5 > for_day26 and next_day5 < next_day26: if point_wm[-1] != df_nv.iloc[i - 1].trade_date: # 避免上下交易日同时取值 point_wm.append(df_nv.iloc[i].trade_date) pass elif for_day5 < for_day26 and next_day5 > next_day26: if point_wm[-1] != df_nv.iloc[i - 1].trade_date: # 避免上下交易日同时取值 point_wm.append(df_nv.iloc[i].trade_date) pass else: point_wm.append(df_nv.iloc[i].trade_date) # 计算高低点的日期 h_l_dt = [] for i in range(len(point_wm)): try: if i == len(point_wm) - 1: pass else: tday = point_wm[i] nday = point_wm[i + 1] # print(tday,nday) df_wm = df_nv[(df_nv['trade_date'] >= tday) & (df_nv['trade_date'] <= nday)] df_wm = df_wm.reset_index() # 重建索引号 # print(df_wm) if df_wm.iloc[1].ma_1 >= df_wm.iloc[1].ma_2: # m = df_wm.high.idxmax(1) m = df_wm.high.idxmax() # print('highest is :',m,df_wm.iloc[m].trade_date) h_l_dt.append(df_wm.iloc[m].trade_date) pass else: n = df_wm.low.idxmin() # print('lowest is :',n,df_wm.iloc[n].trade_date) h_l_dt.append(df_wm.iloc[n].trade_date) pass except Exception as err: print('pgdtime.make_highest_lowest_date is err.', err) return h_l_dtdef data_astype(df, item_list, item5='float'): #格式化数据 try: for item_nv in item_list: # print(item_nv) item_variable = item_nv # 设定转换类型的变更 df[item_variable] = df[item_variable].astype(item5) # 转换数据类型 return df except Exception as err: print(err)def draw_k(db_nm, ts_code, start_dt, end_dt, engine_wm, html_wm): time_s = time.time() def macd(df_wm,period1 = 12,period2 = 26,period3=9): data = df_wm data.drop_duplicates(subset=['ts_code','trade_date'],keep='first',inplace=True) #去重 data = data.reset_index() #重新排 item_variable = 'close' #设定转换类型的变更 data[item_variable] = data[item_variable].astype('float') #转换数据类型 data['dif']=data['close'].ewm(adjust=False,alpha=2/(period1+1),ignore_na=True).mean()-data['close'].ewm(adjust=False,alpha=2/(period2+1),ignore_na=True).mean() data['dea']=data['dif'].ewm(adjust=False,alpha=2/(period3+1),ignore_na=True).mean() data['macd']=2*(data['dif']-data['dea']) return data def calculate_ma(day_count: int): result: List[Union[float, str]] = [] for i in range(len(data["times"])): if i < day_count: result.append("-") continue sum_total = 0.0 for j in range(day_count): sum_total += float(data["datas"][i - j][1]) result.append(abs(float("%.2f" % (sum_total / day_count)))) return result # 数据 def split_data(origin_data) -> dict: datas = [] times = [] vols = [] macds = [] difs = [] deas = [] for i in range(len(origin_data)): datas.append(origin_data[i][1:]) times.append(origin_data[i][0:1][0]) vols.append(origin_data[i][5]) macds.append(origin_data[i][7]) difs.append(origin_data[i][8]) deas.append(origin_data[i][9]) vols = [int(v) for v in vols] return { "datas": datas, "times": times, "vols": vols, "macds": macds, "difs": difs, "deas": deas, } def split_data_part() -> Sequence: mark_line_data = [] idx = 0 tag = 0 vols = 0 for i in range(len(data["times"])): if data["datas"][i][5] != 0 and tag == 0: idx = i vols = data["datas"][i][4] tag = 1 if tag == 1: vols += data["datas"][i][4] if data["datas"][i][5] != 0 or tag == 1: mark_line_data.append( [ { "xAxis": idx, "yAxis": float("%.2f" % data["datas"][idx][3]) if data["datas"][idx][1] > data["datas"][idx][0] else float("%.2f" % data["datas"][idx][2]), "value": vols, }, { "xAxis": i, "yAxis": float("%.2f" % data["datas"][i][3]) if data["datas"][i][1] > data["datas"][i][0] else float("%.2f" % data["datas"][i][2]), }, ] ) idx = i vols = data["datas"][i][4] tag = 2 if tag == 2: vols += data["datas"][i][4] #"value": str(float("%.2f" % (vols / (i - idx + 1)))) + " M" if data["datas"][i][5] != 0 and tag == 2: mark_line_data.append( [ { "xAxis": idx, "yAxis": float("%.2f" % data["datas"][idx][3]) if data["datas"][i][1] > data["datas"][i][0] else float("%.2f" % data["datas"][i][2]), "value": "", }, { "xAxis": i, "yAxis": float("%.2f" % data["datas"][i][3]) if data["datas"][i][1] > data["datas"][i][0] else float("%.2f" % data["datas"][i][2]), }, ] ) idx = i vols = data["datas"][i][4] #return mark_line_data return [] def draw_chart(html_wm): kline = ( Kline() .add_xaxis(xaxis_data=data["times"]) .add_yaxis( series_name="", y_axis=data["datas"], itemstyle_opts=opts.ItemStyleOpts( color="#ef232a", color0="#14b143", border_color="#ef232a", border_color0="#14b143", ), markpoint_opts=opts.MarkPointOpts( data=[ opts.MarkPointItem(type_="max", name="最大值"), opts.MarkPointItem(type_="min", name="最小值"), ] ), markline_opts=opts.MarkLineOpts( label_opts=opts.LabelOpts( position="middle", color="blue", font_size=15 ), data=split_data_part(), symbol=["circle", "none"], ), ) .set_series_opts( markarea_opts=opts.MarkAreaOpts(is_silent=True, data=split_data_part()) ) .set_global_opts( title_opts=opts.TitleOpts(title="{}_K线高低点图表".format(ts_code), pos_left="0"), xaxis_opts=opts.AxisOpts( type_="category", is_scale=False, boundary_gap=False, axisline_opts=opts.AxisLineOpts(is_on_zero=False), splitline_opts=opts.SplitLineOpts(is_show=False), split_number=20, min_="dataMin", max_="dataMax", ), yaxis_opts=opts.AxisOpts( is_scale=True, splitline_opts=opts.SplitLineOpts(is_show=False) ), tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="line"), datazoom_opts=[ opts.DataZoomOpts( is_show=False, type_="inside", xaxis_index=[0, 0], range_end=100 ), opts.DataZoomOpts( is_show=False, xaxis_index=[0, 1], pos_top="97%", range_end=100 ), opts.DataZoomOpts(is_show=False, xaxis_index=[0, 2], range_end=100), ], ) ) kline_line = ( Line() .add_xaxis(xaxis_data=data["times"]) .add_yaxis( series_name="MA5", y_axis=calculate_ma(day_count=5), is_smooth=True, linestyle_opts=opts.LineStyleOpts(opacity=1), label_opts=opts.LabelOpts(is_show=False), ) .add_yaxis( series_name="MA10", y_axis=calculate_ma(day_count=10), is_smooth=True, linestyle_opts=opts.LineStyleOpts(opacity=0.5), label_opts=opts.LabelOpts(is_show=False), ) .add_yaxis( series_name="MA20", y_axis=calculate_ma(day_count=20), is_smooth=True, linestyle_opts=opts.LineStyleOpts(opacity=0.5), label_opts=opts.LabelOpts(is_show=False), ) .add_yaxis( series_name="MA60", y_axis=calculate_ma(day_count=60), is_smooth=True, linestyle_opts=opts.LineStyleOpts(opacity=0.5), label_opts=opts.LabelOpts(is_show=False), ) .set_global_opts( xaxis_opts=opts.AxisOpts( type_="category", grid_index=1, axislabel_opts=opts.LabelOpts(is_show=False), ), yaxis_opts=opts.AxisOpts( grid_index=1, split_number=3, axisline_opts=opts.AxisLineOpts(is_on_zero=False), axistick_opts=opts.AxisTickOpts(is_show=False), splitline_opts=opts.SplitLineOpts(is_show=False), axislabel_opts=opts.LabelOpts(is_show=False), ), ) ) # Overlap Kline + Line overlap_kline_line = kline.overlap(kline_line) # Bar-1 bar_1 = ( Bar() .add_xaxis(xaxis_data=data["times"]) .add_yaxis( series_name="Volumn", y_axis=data["vols"], xaxis_index=1, yaxis_index=1, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts( color=JsCode( """ function(params) { var colorList; if (barData[params.dataIndex][1] > barData[params.dataIndex][0]) { colorList = '#ef232a'; } else { colorList = '#14b143'; } return colorList; } """ ) ), ) .set_global_opts( xaxis_opts=opts.AxisOpts( type_="category", grid_index=1, axislabel_opts=opts.LabelOpts(is_show=False), ), legend_opts=opts.LegendOpts(is_show=False), ) ) # Bar-2 (Overlap Bar + Line) bar_2 = ( Bar() .add_xaxis(xaxis_data=data["times"]) .add_yaxis( series_name="MACD", y_axis=data["macds"], xaxis_index=2, yaxis_index=2, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts( color=JsCode( """ function(params) { var colorList; if (params.data >= 0) { colorList = '#ef232a'; } else { colorList = '#14b143'; } return colorList; } """ ) ), ) .set_global_opts( xaxis_opts=opts.AxisOpts( type_="category", grid_index=2, axislabel_opts=opts.LabelOpts(is_show=False), ), yaxis_opts=opts.AxisOpts( grid_index=2, split_number=4, axisline_opts=opts.AxisLineOpts(is_on_zero=False), axistick_opts=opts.AxisTickOpts(is_show=False), splitline_opts=opts.SplitLineOpts(is_show=False), axislabel_opts=opts.LabelOpts(is_show=False), ), legend_opts=opts.LegendOpts(is_show=False), ) ) line_2 = ( Line() .add_xaxis(xaxis_data=data["times"]) .add_yaxis( series_name="DIF", y_axis=data["difs"], xaxis_index=2, yaxis_index=2, label_opts=opts.LabelOpts(is_show=False), ) .add_yaxis( series_name="DIF", y_axis=data["deas"], xaxis_index=2, yaxis_index=2, label_opts=opts.LabelOpts(is_show=False), ) .set_global_opts(legend_opts=opts.LegendOpts(is_show=False)) ) overlap_bar_line = bar_2.overlap(line_2) grid_chart = Grid(init_opts=opts.InitOpts(width="1400px", height="800px")) grid_chart.add_js_funcs("var barData = {}".format(data["datas"])) grid_chart.add( overlap_kline_line, grid_opts=opts.GridOpts(pos_left="3%", pos_right="1%", height="60%"), ) grid_chart.add( bar_1, grid_opts=opts.GridOpts( pos_left="3%", pos_right="1%", pos_top="71%", height="10%" ), ) grid_chart.add( overlap_bar_line, grid_opts=opts.GridOpts( pos_left="3%", pos_right="1%", pos_top="82%", height="14%" ), ) #grid_chart.render("/home/test/picture/pye_k46.html") grid_chart.render(html_wm) query = "select ts_code,trade_date,open,close,low,high,vol,amount from {0} where trade_date >= '{1}' " \ "and trade_date <= '{2}' and ts_code = '{3}';".format(db_nm, start_dt, end_dt, ts_code) print(query) df1 = pd.read_sql(query,con=engine_wm) #取单数据 print('df is ok ,first num:',len(df1)) df1.drop_duplicates(subset=['trade_date','ts_code'], keep='first', inplace=True) # 去重 df3 = macd(df1) #格式化数据 item_list = ['open', 'close', 'low', 'high', 'vol', 'macd', 'dif', 'dea'] df3 = data_astype(df3, item_list, item5='float') df5 = df3 df3 = df3.sort_values(by='trade_date', axis=0, ascending=True) #排序 升序 df3 = df3.reset_index(drop=True) df8 = df3 #取单数据 print('df is ok ,first num:', len(df8)) df8.drop_duplicates(subset=['trade_date', 'ts_code'], keep='first', inplace=True) # 去重 df8 = df8.sort_values(by='trade_date', axis=0, ascending=True) #print('df8:',df8) item_list = ['open', 'close', 'low', 'high', 'vol'] df8 = data_astype(df8, item_list, item5='float') #排序 升序 h_l_dt = make_highest_lowest_date(df8, engine_wm, ma_1=5, ma_2=26) #生成高低点日期 h_l = [] for i in range(len(h_l_dt)): h_l_sim = h_l_dt[i][:4]+h_l_dt[i][5:7]+h_l_dt[i][-2:] h_l.append(h_l_sim) def h_l_m(x): if x in h_l: return 1 else: return 0 df3['k_form'] = df3.apply(lambda x:h_l_m(x['trade_date']),axis=1) df2 = df3[['trade_date', 'open', 'close', 'low', 'high', 'vol', 'k_form', 'macd', 'dif', 'dea']] df2 = df2.sort_values(by='trade_date', ascending=True) #排序 #格式化数据 item_list = ['open', 'close', 'low', 'high', 'vol', 'macd', 'dif', 'dea'] df2 = data_astype(df2, item_list, item5='float') echarts_data = df2.values.tolist() data = split_data(origin_data=echarts_data) draw_chart(html_wm)if __name__ == '__main__': start_dt = '20220101' end_dt = '20221024' db_nm = 'stock_index_daily' # 数据库来源 ts_code = '399300.SZ' html_wm = "/home/test/picture/pye_k66.html" draw_k(db_nm, ts_code, start_dt, end_dt, engine_pg, html_wm) """ 运行结果: df is ok ,first num: 471 df is ok ,first num: 193 Process finished with exit code 0 """ """ Pgabc 2022000011 author : Pgabc """
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #pythonzbar