龙空技术网

如何将Tushare中获取的数据保存到Mongodb中

SZ深呼吸 113

前言:

此刻我们对“pythontushare保存”大体比较注意,朋友们都需要学习一些“pythontushare保存”的相关文章。那么小编在网上汇集了一些对于“pythontushare保存””的相关知识,希望朋友们能喜欢,你们一起来了解一下吧!

  在上一文中,我展示了一下如何从Tushare中获取数据,有刚接触不久的朋友问具体要怎么取数据,今天我就来讲一下这个问题。

  Mongodb的安装和使用,我这里就不详细讲解了,有需要的朋友上网搜索学习一下。我这里用的Mongodb数据库客户端是Studio 3T。我将以读取并保存基本数据为例进行讲解,先创建一个名为"stockdb"的数据库

再创建一个保存基本数据的集合

创建后如图

这个集合就相当于关系型数据库中的表,但是我们不需要去创建表中的字段,因为它是以key-value的方式进行存储的,就好像Python中的字典以及json的数据格式。你可以把一个字典或一段json数据直接存进mongodb,相当方便。

我这里贴一段Python代码,用来访问操作mongodb

import pymongoclass MongoDB(object):    __instance = None    __init_flag = False    def __new__(cls, *args, **kwargs):        if not cls.__instance:            cls.__instance = super().__new__(cls, *args, **kwargs)        return cls.__instance    def __init__(self):        if not self.__init_flag:            self.conn = pymongo.MongoClient("mongodb://localhost:27017/")            self.db = self.conn.stockdb            self.stock_basic = self.db.stock_basic            self.daily = self.db.daily            self.__init_flag = True    # 查询集合中所有数据    def find_all_data(self):        data_list = self.daily.find()        for data in data_list:            print(data)        # for i, data in enumerate(data_list):        #     print("index:%s, data:%s" % (i, data))    # 查询集合中单条数据    def find_one_data(self):        # 第一个{}放where条件,第二个{}指定那些列显示和不显示 (0表示不显示 1表示显示)        data = self.daily.find_one({'ts_code': '600000.SH'})        print(data)        data = self.daily.find_one({'data.trade_date': '20220126'}, {'_id': 0, 'ts_code': 1, 'data': 1})        print(data)    # 查询集合中多条数据    def find_data(self):        data_list = self.daily.find({}, {'_id': 0, 'ts_code': 1, 'exchange': 1})        for data in data_list:            print(data)        print('-------------------------------------------')        # 指定查询条件,并且各条件关系为and        data_list = self.daily.find({'ts_code': '600000.SH', 'exchange': 'SSE'},                                    {'_id': 0, 'ts_code': 1, 'exchange': 1})        for data in data_list:            print(data)        print('-------------------------------------------')        # 指定查询条件,并且各条件关系为or        data_list = self.daily.find({'$or': [{'ts_code': '600000.SH'}, {'ts_code': '600006.SH'}]},                                    {'_id': 0, 'ts_code': 1, 'exchange': 1, 'data': 1})        for data in data_list:            print(data)        print('-------------------------------------------')        data_list = self.daily.find({'data.trade_date': '20220126'}, {'_id': 0, 'ts_code': 1, 'data': 1})        for data in data_list:            print(data)    # #向集合中插入一条数据    def insert_data(self):        # data = "{'trade_date' : '20220117', 'open' : 8.57, 'high' : 8.63, 'low' : 8.57, 'close' : 8.57, 'amount' : 190459840.0, 'vol' : NumberInt(221780)}"        data = {"ts_code": "600006.SH", "exchange": "SSE", "data": [            {"trade_date": "20220126", "open": 6.19, "high": 6.24, "low": 6.11, "close": 6.18, "amount": 54826106.0,             "vol": 88673}]}        self.daily.insert_one(data)    def update_add_to_set(self):        data = [{"trade_date": "20220129", "open": 6.19, "high": 6.24, "low": 6.11, "close": 6.18, "amount": 54826106.0,                 "vol": 88673},                {"trade_date": "20220128", "open": 6.19, "high": 6.24, "low": 6.11, "close": 6.18, "amount": 54826106.0,                 "vol": 88673}]        data_list = self.daily.update_one({'ts_code': '600004.SH'},                                          {'$push': {'data': {'$each': data, '$position': 0}}})    def save_stock_basic(self, record):        self.stock_basic.insert_one(record)    def find_stock_basic(self, parms):        return list(self.stock_basic.find(parms).sort([("ts_code", 1)]))    def find_stock_basic_by_tscode(self, tscode):        return self.stock_basic.find_one({"ts_code": tscode})    def update_stock_basic(self, srcdata, dstdata):        self.stock_basic.update_one(srcdata, {"$set": dstdata})mongodb = MongoDB()# mongodb.find_all_data()# mongodb.find_one_data()# mongodb.find_data()# mongodb.insert_data()# mongodb.update_add_to_set()

这里需要引用第三方库pymongo

pip install pymongo 安装即可

这段代码用了一个单例模式,避免每次使用都去创建实例连接数据库,别的代码里直接引用mongodb即可。如:

from MongoDB import mongodbmongodb.find_stock_basic({'ts_code': '600000.SH'})

我们再来看一下如何连接Tushare并获取数据的

import tushare as tsfrom Logger import loggerclass TushareApi:    __instance = None    __init_flag = False    def __new__(cls, *args, **kwargs):        if not cls.__instance:            cls.__instance = super().__new__(cls, *args, **kwargs)        return cls.__instance    def __init__(self):        if not self.__init_flag:            ts.set_token('这里填你自己的token')            self.pro = ts.pro_api()            self.__init_flag = True    def query_stock_basic(self):        logger.debug("TushareApi query_stock_basic...")        df = self.pro.stock_basic(list_status='L', fields='ts_code,symbol,name,industry,fullname,enname,cnspell,'                                                          'market,exchange,curr_type,list_status,list_date,'                                                          'delist_date,is_hs')        return dftushareApi = TushareApi()stocks = tushareApi.query_stock_basic()print(type(stocks))print(stocks)

运行结果

我们可以看到返回的结果是pandas.core.frame.DataFrame类型的

接着就是遍历它,并把它插入或更新到mongodb中

import datetimefrom Logger import loggerfrom MongoDB import mongodbfrom TushareApi import tushareApifields = ('symbol', 'name', 'industry', 'fullname', 'enname', 'cnspell', 'market', 'exchange', 'curr_type', 'list_status','list_date', 'delist_date', 'is_hs')begin = datetime.datetime.now()df = tushareApi.query_stock_basic()print(type(df))print(df)end = datetime.datetime.now()k = end - beginlogger.debug("load stock_basic elapsed time:%f" % (k.total_seconds()))end = datetime.datetime.now()k = end - beginlogger.debug("open mongodb elapsed time:%f" % k.total_seconds())for index, row in df.iterrows():    print(row)    tscode = row['ts_code']    dbr = mongodb.find_stock_basic_by_tscode(tscode)    if dbr is None:        logger.debug("insert:%s" % (str(row)))        mongodb.save_stock_basic(row.to_dict())    else:        for field in fields:            mf = dbr[field]            nf = row[field]            if mf != nf:                logger.debug("update ts_code:%s field:%s %s===>%s" % (tscode, field, mf, nf))                mongodb.update_stock_basic({'ts_code': tscode}, {field: nf})end = datetime.datetime.now()k = end - beginlogger.debug("insert data elapsed time:%f" % k.total_seconds())

标签: #pythontushare保存