前言:
如今各位老铁们对“scrapymysql”大致比较珍视,朋友们都需要剖析一些“scrapymysql”的相关知识。那么小编同时在网摘上收集了一些对于“scrapymysql””的相关知识,希望看官们能喜欢,兄弟们快快来了解一下吧!今天给大家介绍“Python抓取上市公司财务数据”主题的第8章节 - 财务数据抓取结果入库MySQL。本文将侧重讲解抓取网页后得到的“非结构化/半结构化数据”如何快捷实现数据结构化,并导入到关系型数据库(MySQL)中。回顾主题,我们分成9节来为讲解Python抓取上市公司财务数据,从0到1来详细拆解程序开发流程和编码实现,是一套从入门到熟练python抓取的课题。其包含的主要章节有:
1. Centos7搭建代码库和Python运行环境
2. Win10搭建Python开发环境
3. Python爬虫应用运行(Docker)镜像准备
4. 编码实现上市公司列表抓取
5. 编码实现上市公司简介和行业板块抓取
6. 编码实现上市公司企业财务摘要抓取
7. 编码实现上市公司历年财务数据抓取
8. 编码实现上市公司财务数据抓取结果入库(Mysql)
9. Python代码提交及部署运行
本节内容主要分成三点来讲解:数据库表结构设计、上市公司数据的数据库表的创建、编写加载抓取结果json数据文件并入库,其中先介绍数据表如何设计,再介绍如何操作mysql数据库来完成表的创建,最后详细讲解python代码编写实现抓取结果数据导入数据库中。本文约9千字,建议收藏,也可分享到微信后电脑端查看。
一、数据库表结构设计
第一步:数据库表关键设计
我们在环境搭建时已经创建好了上市公司数据存储的数据库ssgssj,其数据库表可根据上市公司财务数据的类型情况来,我们设计以下6个表:
1) 公司简介enterprise,存储公司简介和所属行业,自增id作为主键,股票代码作为唯一索引;
2) 财务摘要financial_summary,存储上市公司的财务摘要数据;
3) 财务指标financial_indicators,存储上市公司的财务指标数据;
4) 资产负债表balance_sheet,存储上市公司的财务指标数据;
5) 利润表profit_statement,存储上市公司的利润表数据;
6) 现金流量表chart_of_cash_flow,存储上市公司的现金流量表数据。
除了公司简介表enterprise,其他表都使用,企业id和报告日期作为联合主建,自增id作为唯一索引,且设置报告日期report_date为索引,为了后续按日期筛选查询的效率。
第二步:公司简介表设计及创建SQL编写
在centos7系统中创建创建/root/opdir/db_sql文件夹后,创建编辑/root/opdir/db_sql/enterprise.sql文件,使用命令如下:
mkdir /root/opdir/db_sqlvi /root/opdir/db_sql/enterprise.sql
添加建表SQL,内容如下:
CREATE TABLE `enterprise` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '公司ID', `ename` varchar(30) DEFAULT NULL COMMENT '公司名称', `fname` varchar(100) DEFAULT NULL COMMENT '公司英文名称', `listed_market` varchar(20) DEFAULT NULL COMMENT '上市市场', `listed_date` varchar(20) DEFAULT NULL COMMENT '上市日期', `issue_price` bigint(20) DEFAULT NULL COMMENT '发行价格', `underwriter` varchar(150) DEFAULT NULL COMMENT '主承销商', `Establish_date` varchar(20) DEFAULT NULL COMMENT '成立日期', `registered_money` decimal(18,0) DEFAULT NULL COMMENT '注册资本', `mechanism_type` varchar(20) DEFAULT NULL COMMENT '机构类型', `organizational` varchar(20) DEFAULT NULL COMMENT '组织形式', `industry` varchar(64) DEFAULT NULL COMMENT '所属行业', `e_id` int(11) DEFAULT NULL COMMENT '股票代码', PRIMARY KEY (`id`), UNIQUE KEY `e_id` (`e_id`), KEY `listed_date` (`listed_date`), KEY `ename` (`ename`), KEY `fname` (`fname`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='公司简介'
第三步:财务摘要表设计及创建SQL编写
创建编辑/root/opdir/db_sql/financial_summary.sql文件,使用命令如下:
vi /root/opdir/db_sql/financial_summary.sql
添加建表SQL,内容如下:
CREATE TABLE `financial_summary` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '财务摘要表ID', `enterpriseID` int(11) NOT NULL COMMENT '公司ID', `report_date` varchar(20) NOT NULL COMMENT '报表日期', `Net_assets_per_share_stock_final_stock` decimal(16,2) DEFAULT NULL COMMENT '每股净资产-摊薄/期末股数', `Cash_per_share` decimal(16,2) DEFAULT NULL COMMENT '每股现金流', `Per_share_capital_reserve` decimal(16,2) DEFAULT NULL COMMENT '每股资本公积金', `Total_fixed_assets` bigint(20) DEFAULT NULL COMMENT '固定资产合计', `Total_flow_assets` bigint(20) DEFAULT NULL COMMENT '流动资产合计', `Total_assets` bigint(20) DEFAULT NULL COMMENT '资产总计', `Total_term_liability` bigint(20) DEFAULT NULL COMMENT '长期负债合计', `Main_business_income` bigint(20) DEFAULT NULL COMMENT '主营业务收入', `Financial_cost` bigint(20) DEFAULT NULL COMMENT '财务费用', `Net_profit` bigint(20) DEFAULT NULL COMMENT '净利润', PRIMARY KEY (`enterpriseID`,`report_date`), UNIQUE KEY `id` (`id`), KEY `report_date` (`report_date`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='财务摘要表'
第四步:财务指标表设计及创建SQL编写
创建编辑/root/opdir/db_sql/ financial_indicators.sql文件,使用命令如下:
vi /root/opdir/db_sql/financial_indicators.sql
添加建表SQL,内容如下:
CREATE TABLE `financial_indicators` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '财务指标表ID', `enterpriseID` int(11) NOT NULL COMMENT '公司ID', `report_date` varchar(20) NOT NULL COMMENT '上报日期', `Index_per_share` decimal(16,4) DEFAULT NULL COMMENT '每股指标', `Diluted_earnings_per_share` decimal(16,4) DEFAULT NULL COMMENT '摊薄每股收益(元)', `Weighted_per_share` decimal(16,4) DEFAULT NULL COMMENT '加权每股收益(元)', `Earnings_per_share_After_adjustment` decimal(16,4) DEFAULT NULL COMMENT '每股收益_调整后(元)', `deduct_nonrecurrent_Profit_and_loss_Earnings_per_share` decimal(16,4) DEFAULT NULL COMMENT '扣除非经常性损益后的每股收益(元)', `Net_assets_per_share_preadjustment` decimal(16,4) DEFAULT NULL COMMENT '每股净资产_调整前(元)', `Net_assets_per_share_after_adjustment` decimal(16,4) DEFAULT NULL COMMENT '每股净资产_调整后(元)', `Operating_cash_flow` decimal(16,4) DEFAULT NULL COMMENT '每股经营性现金流(元)', `Capital_reserve` decimal(16,4) DEFAULT NULL COMMENT '每股资本公积金(元)', `Undistributed_profit` decimal(16,4) DEFAULT NULL COMMENT '每股未分配利润(元)', `After_adjustment_Net_worth` decimal(16,4) DEFAULT NULL COMMENT '调整后的每股净资产(元)', PRIMARY KEY (`enterpriseID`,`report_date`), UNIQUE KEY `id` (`id`), KEY `report_date` (`report_date`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='财务指标表'
第五步:资产负债表设计及创建SQL编写
创建编辑/root/opdir/db_sql/balance_sheet.sql文件,使用命令如下:
vi /root/opdir/db_sql/balance_sheet.sql
添加建表SQL,内容如下:
CREATE TABLE `balance_sheet` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '资产负债表ID', `enterpriseID` varchar(20) NOT NULL COMMENT '公司ID', `report_date` varchar(20) NOT NULL COMMENT '上报日期', `liquid_assets` decimal(11,2) DEFAULT NULL COMMENT '流动资产', `monetary_capital` decimal(11,2) DEFAULT NULL COMMENT '货币资金', `trading_financial_assets` decimal(11,2) DEFAULT NULL COMMENT '交易性金融资产', `derivative_financial_assets` decimal(11,2) DEFAULT NULL COMMENT '衍生金融资产', `Notes_receivable_and_accounts_receivable` decimal(11,2) DEFAULT NULL COMMENT '应收票据及应收账款', `bill_receivable` decimal(11,2) DEFAULT NULL COMMENT '应收票据', `accounts_receivable` decimal(11,2) DEFAULT NULL COMMENT '应收账款', `Receivables_financing` decimal(11,2) DEFAULT NULL COMMENT '应收款项融资', `advance_payment` decimal(11,2) DEFAULT NULL COMMENT '预付款项', `Other_Receivables_total` decimal(11,2) DEFAULT NULL COMMENT '其他应收款(合计)', `interest_receivable` decimal(11,2) DEFAULT NULL COMMENT '应收利息', `dividends_receivable` decimal(11,2) DEFAULT NULL COMMENT '应收股利', `other_receivables` decimal(11,2) DEFAULT NULL COMMENT '其他应收款', PRIMARY KEY (`enterpriseID`,`report_date`), UNIQUE KEY `id` (`id`), KEY `report_date` (`report_date`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='资产负债表'
第六步:利润表设计及创建SQL编写
创建编辑/root/opdir/db_sql/profit_statement.sql文件,使用命令如下:
vi /root/opdir/db_sql/profit_statement.sql
添加建表SQL,内容如下:
CREATE TABLE `profit_statement` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '利润表ID', `enterpriseID` varchar(20) NOT NULL COMMENT '公司ID', `report_date` varchar(20) NOT NULL COMMENT '上报日期', `operating_receipt` decimal(11,2) DEFAULT NULL COMMENT '营业收入', `operating_cost_total` decimal(11,2) DEFAULT NULL COMMENT '二、营业总成本', `operating_costs` decimal(11,2) DEFAULT NULL COMMENT '营业成本', `business_tariff_and_annex` decimal(11,2) DEFAULT NULL COMMENT '营业税金及附加', `selling_expenses` decimal(11,2) DEFAULT NULL COMMENT '销售费用', `handling_expense` decimal(11,2) DEFAULT NULL COMMENT '管理费用', `financial_cost` decimal(11,2) DEFAULT NULL COMMENT '财务费用', `development_cost` decimal(11,2) DEFAULT NULL COMMENT '研发费用', `assets_impairment_loss` decimal(11,2) DEFAULT NULL COMMENT '资产减值损失', PRIMARY KEY (`enterpriseID`,`report_date`), UNIQUE KEY `id` (`id`), KEY `report_date` (`report_date`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='利润表'
第七步:现金流量表设计及创建SQL编写
创建编辑/root/opdir/db_sql/chart_of_cash_flow.sql文件,使用命令如下:
vi /root/opdir/db_sql/chart_of_cash_flow.sql
添加建表SQL,内容如下:
CREATE TABLE `chart_of_cash_flow` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '现金流量表ID', `enterpriseID` varchar(20) NOT NULL COMMENT '公司ID', `report_date` varchar(20) NOT NULL COMMENT '上报日期', `Cash_flow_from_operating_activities` decimal(11,2) DEFAULT NULL COMMENT '经营活动产生的现金流量', `Cash_received_from_sales_of_goods_and_services` decimal(11,2) DEFAULT NULL COMMENT '销售商品、提供劳务收到的现金', `refund_of_tax_and_levies` decimal(11,2) DEFAULT NULL COMMENT '收到的税费返还', `Other_cash_received_in_connection_with_business_activities` decimal(11,2) DEFAULT NULL COMMENT '收到的其他与经营活动有关的现金', `Subtotal_cash_inflow_from_operating_activities` decimal(11,2) DEFAULT NULL COMMENT '经营活动现金流入小计', PRIMARY KEY (`enterpriseID`,`report_date`), UNIQUE KEY `id` (`id`), KEY `report_date` (`report_date`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='现金流量表'
至此,财务数据相关数据库表结构设计完成。
二、上市公司数据的数据库表的创建
第一步:拷贝SQL文件到mysql容器中
使用docker cp命令拷贝宿主机文件到容器中,命令如下:
docker cp /root/opdir/db_sql mysql:/opt/app-root/src/
第二步:切换到mysql容器的命令行
使用如下命令:
docker exec -it mysql /bin/bash
第三步:使用root用户登陆mysql的命令行
命令如下:
mysql -uroot
第四步:导入表结构到数据库ssgssj
切换到ssgssj数据库后,使用source来执行导入SQL文件,如下:
use ssgssj;source db_sql/enterprise.sql;source db_sql/financial_summary.sql;source db_sql/financial_indicators.sql;source db_sql/balance_sheet.sql;source db_sql/profit_statement.sql;source db_sql/chart_of_cash_flow.sql;
执行后,效果如下图:
至此财务数据相关表都创建完成了,我们可以通过show tables来查看,如下图:
三、编写加载抓取结果json数据文件并入库
第一步:打开save_to_db.py模块文件
在项目scrapy-finance文件夹中,选择save_to_db文件并选择右键-Edit with IDLE-Edit with IDLE 3.0(64-bit),如下图:
点击打开后,就可以开始编写代码,如下图:
第二步:编写获取数据库连接的方法
在SaveToDb类中编写get_db_connect方法来获取数据库连接,代码如下:
def get_db_connect(self, mysql_host='ysedu-python', mysql_port=3306, mysql_user='dbuser', mysql_pwd='scrap_db_PWD', mysql_db='ssgssj'): db = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_pwd, database=mysql_db) return db
同时我们需要在模块顶部加载pymysql依赖,代码如下:
import pymysql
代码编写后,效果如下图:
第三步:编写获取数据库表字段和备注内容的方法
我们需要将抓取结果和数据库字段建立映射关系,从表结构设计上,我们将表字段的备注和财务数据的指标名称保持一致。因此我们需要在SaveTbDb类中编写query_row_map方法来获取数据库ssgssj的表字段和备注映射,代码如下:
def query_row_map(self, db, table_name, db_name = 'ssgssj'): row_map = {} schema_sql = 'SELECT column_name, column_comment FROM information_schema.columns WHERE table_schema = "%s" AND table_name = "%s" AND column_name!="id"'%(db_name,table_name) try: cursor = db.cursor() row_count = cursor.execute(schema_sql) if row_count > 0: data = cursor.fetchall() for item in data: one_row = item[0] one_name = item[1] row_map[one_row] = one_name except (pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as err: print ('query db error.', err) return row_map
代码编写后,效果如下图:
第四步:设计数据表的关联属性和映射关系字典数组
在SaveToDb类的__init__方法中增加数据属性映射字典self.data_attr_list,每类财务数据都包含表名table、主键id、日期字段dt、数据文件标识sign这几关联属性,代码如下:
detail_attr_dict = {'table': 'enterprise', 'id': 'e_id', 'sign': 'detail'}summary_attr_dict = {'table': 'financial_summary', 'id': 'enterpriseID', 'sign': 'summary', 'dt': 'report_date'}finance_attr_dict = {'table': 'financial_indicators', 'id': 'enterpriseID', 'sign': 'finance', 'dt': 'report_date'}assert_attr_dict = {'table': 'balance_sheet', 'id': 'enterpriseID', 'sign': 'assert', 'dt': 'report_date'}profit_attr_dict = {'table': 'profit_statement', 'id': 'enterpriseID', 'sign': 'profit', 'dt': 'report_date'}cache_attr_dict = {'table': 'chart_of_cash_flow', 'id': 'enterpriseID', 'sign': 'cache', 'dt': 'report_date'}self.data_attr_list = [detail_attr_dict, summary_attr_dict, finance_attr_dict, assert_attr_dict, profit_attr_dict, cache_attr_dict]
代码编写后,效果如下图:
第五步:加载数据库表字段映射到属性关系字典数组中
在SaveToDb类的run方法中循环加载待入库数据的数据表结构,代码如下:
schema_db = self.get_db_connect(mysql_db = 'information_schema')try: for table_attr_dict in self.data_attr_list: table_name = table_attr_dict['table'] row_map = self.query_row_map(db = schema_db, table_name = table_name) print ("%s row map=%s" %(table_name, str(row_map))) table_attr_dict['row_map'] = row_mapfinally: schema_db.close()
代码编写后,效果如下图:
使用Ctrl+S保存后,用Ctrl+F5运行效果如下图:
运行结果显示,表字段和备注成功加载出来。运行测试后,我们注释了循环中的print调用。
第六步:编写加载json数据文件的方法
在SaveToDb类中编写load_json_file方法来获取数据库连接,代码如下:
def load_json_file(self, list_file): if not os.path.isfile(list_file): return [] code_list = [] # code list file try: fb = open(list_file, 'r') while True: line = fb.readline() if line: #print ('read line:',line) one_json = json.loads(line) code_list.append(one_json) else: break except IOError as err: print ('IO Error:', err) else: fb.close() return code_list
同时我们需要在模块顶部加载json依赖,代码如下:
import osimport json
代码编写后,效果如下图:
第七步:编写循环加载json数据文件的逻辑
在SaveToDb类的run方法中加载上市公司列表数据,并循环上市公司列表,进一步循环加载其各类财务数据,为后续的处理入库做好准备,代码如下:
# read code listcode_list = self.load_json_file(self.list_file)for code_json in code_list: code = code_json['code'] for table_attr_dict in self.data_attr_list: id_col = table_attr_dict['id'] date_col = table_attr_dict.get('dt') data_sign = table_attr_dict['sign'] row_map = table_attr_dict['row_map'] json_data_file = '%s/%s_%s.data' %(self. detail_dir, data_sign, code) json_dict = self.load_json_file(json_data_file) if data_sign == 'detail': if not json_dict: break industry_dict = self.load_json_file('%s/industry_%s.data' %(self. detail_dir, code)) value_dict = {**json_list[0], ** industry_dict[0], **code_json} print (value_dict)
其中,如果该公司简介数据都不存在,就直接break不再处理该公司数据。数据存储路径self.list_file和self.detail_dir需要同时在__init__方法中定义,代码如下:
self.list_file = "./list_data/code_list.data" self.detail_dir = "./detail_data"
模块代码更新后,效果如下图:
使用Ctrl+S保存后,用Ctrl+F5运行效果如下图:
根据运行结果可知,财务数据成功从文件中加载了出来。测试运行后,注释掉对应的print调用。
第八步:编写将财务数据插入到数据库的插入SQL生成
在SaveToDb类中编写save_to_db方法来根据指定主键和字段映射关系,将财务数据插入数据库中,该方法的参数有:数据库连接db、表名table、主键ID字段名id_col、主键ID值id_val、主键日期字段名date_col、主键日期值date_val、字段与指标的映射字典row_map、财务数据字典value_dict,代码如下:
def save_to_db(self, db, table, id_col, id_val, date_col, date_val, row_map, value_dict): row_id = None date = None # code column reset row map value_dict[id_col] = id_val row_map[id_col] = id_col if date_col and date_val: date = date_val.replace('-','') value_dict[date_col] = date row_map[date_col] = date_col sql = None try: cursor = db.cursor() if date: sql = "SELECT id FROM %s where %s='%s' and %s='%s'"%(table, id_col, id_val, date_col, date) else: sql = "SELECT id FROM %s where %s='%s'"%(table, id_col, id_val) row_count = cursor.execute(query_sql) if row_count > 0: data = cursor.fetchone() row_id = data[0] else: row_keys = row_map.keys() rows = [value_dict.get(row_map.get(item)) for item in row_keys] sql = 'INSERT INTO ' + table + ' (' + ','.join(row_keys) + ') VALUES (' + ','.join(rows) + ')' print (sql) except (pymysql.err.DataError, pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as err: print ('save db error.', err) print ("[error sql]" + sql) return row_id
保存入库的代码,我们先根据主键去查询表中记录是否存在,不存在则生成对应插入SQL,并且将插入SQL打印出来。代码编写后,效果如下图:
第九步:测试调用入库方法
在SaveToDb类run方法的循环开始处获取数据库连接,并对循环增加try finally,再finally处增加close处理,代码如下:
db = self.get_db_connect()try: ……finally: db.close()
在run方法的循环处理detail类型的if分支中增加保存入库方法的调用,且因公司简介有单独抓取的行业字段,故在row_map中增加industry字段的数据名映射。代码如下:
id_val = codedate_val = Nonerow_map['industry'] = 'industry'row_id = self.save_to_db(db, table_name, id_col, id_val, date_col, date_val, row_map, value_dict)
模块代码更新后,效果如下图:
快捷键Ctrl+S保存模块代码后Ctrl+F5运行后,效果如下图:
根据结果可知,插入SQL基本拼接完成。
第九步:编写将抓取数据作统一清洗的方法
在SaveToDb类中编写format_val方法来处理待入库的数据值,代码如下:
def format_val(self, val): if not val or val=='--': return None else: val = str(val).replace(',', '') if val.endswith('元'): val = val.replace('元','') elif val.endswith("万元(CNY)"): val = str(float(val[:-7]) * 1.0 * 10000) elif val.endswith("万元(HKD)"): val = str(float(val[:-7]) * 0.84 * 10000) elif val.endswith("万元(USD)"): val = str(float(val[:-7]) * 6.60 * 10000) elif re.match('^\d{4}-\d{2}-\d{2}$',val): val = val.replace('-','') else: val = val.replace('\"','\\\"') return val
我们将代表无数据的值统一转换成None,并且根据数值的单位做了换算,将时间格式做了去除-处理。最后将双引号做了转义,避免出现SQL语法错误。由于用到了正则匹配,我们还需要在模块顶部加载re模块,代码如下:
import re
模块代码更新后,效果如下图:
第十步:编写将待插入的SQL数据进行统一清洗转换
在SaveToDb类的save_to_db方法中,我们对rows进行清洗转换处理,代码如下:
rows = [format_val(item) for item in rows] rows = [('"%s"'%(val) if val else 'null') for val in rows]
将代码增加到save_to_db方法中后,效果如下图:
第十一步:编写执行待插入的SQL
在SaveToDb类的save_to_db方法中,我们对待执行插入SQL进行执行,并获取插入的自增ID,代码如下:
row_count = cursor.execute(sql) row_id = db.insert_id() db.commit()
并在异常处理处增加回滚操作,代码如下:
db.rollback()
编辑好函数save_to_db后,效果如下图:
第十一步:测试运行公司简介数据插入数据库
用Ctrl+S保存模块代码后Ctrl+F5运行后,模块正常运行结束。如下图:
第十二步:编写增加有报告日期的财务数据入库逻辑
在SaveToDb类run方法的循环中,对非details数据,增加else分支,对其进行按日期循环调用插入数据库方法,代码如下:
else: for date_val, header_dict in (json_list[0].items() if date_json_list else ()): value_dict = {} for part_dict in header_dict.values(): value_dict.update(part_dict) self.save_to_db(db, table_name, id_col, row_id, date_col, date_val, row_map, value_dict)
编辑好模块代码后,效果如下图:
至此,我们完成了上市公司财务数据抓取结果的入库程序编写,可以去了掉run方法中的#todo注释,保存编写好的模块代码。
四、结语
本节详细介绍了上市公司抓取结果的六类数据的数据库表创建及python编写入程序导入数据库,从表结构上进行了索引优化,在入库SQL记录的生成上,结合了表结构设计来实现通用的数据入库程序编写。使得我们只需要调整表结构就可以实现抓取数据结果字段的增加和删除。至此,本主题代码编写的内容都介绍完成了,下次我将介绍最后一节Python代码提交及部署运行,对主题完整流程实现感兴趣的朋友可以关注下我的后续动态~
标签: #scrapymysql