前言:
现在看官们对“python里的join”都比较关心,咱们都想要学习一些“python里的join”的相关内容。那么小编在网络上搜集了一些有关“python里的join””的相关文章,希望各位老铁们能喜欢,同学们快快来了解一下吧!如何最佳地Join大型数据集-多种方法的指南
旋转磁盘,鱼片或使用大数据有很多方法-这是快速指南。
样本数据集
Kaggle电影数据库[8]-25,000万条额定值,用于45,000部电影,数据分为5个文件:
要查找电影的最高平均收视率,您需要将指向元数据的链接添加到收视率中:
SELECT m.title, avg(r.rating) FROM links l INNER JOIN to metas m ON m.imdbId=l.imdbId INNER JOIN to ratings ON r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5
老派SQL RDBMS
经典的方法是加载数据库,建立索引并运行前面提到的SQL(或使用下面的经典SQL):
SELECT m.title, avg(r.rating) FROM links l, metas m, ratings r WHERE m.imdbId=l.imdbId and r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5
RDBMS的加入通过3种主要方式进行,并带有一些平台变体:
· 嵌套循环-对表A的每一行查找表B中的匹配键。B上的索引使查找为O(A * log B),否则联接为SLOW-O(A * B)。
· 哈希联接—通过查找关键字构建表B的哈希/映射,使联接查找非常快— O(A * 1)
· 合并排序-对两个表进行排序并在一次通过中合并,除非预先排序,否则不会超快-O(A + B + A log A + B log B)→O(A log A + B log B)
使用Sqlite3 [1]-创建数据库,表和加载非常容易:
import sqlite3import csvcon = sqlite3.connect('mydatabase.db')c = con.cursor()# load file into data arraywith open(file, 'r', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) fields = next(csvreader) # strip header for row in csvreader: data.append(row)# create tablec.execute("CREATE TABLE ratings(movieId text, rating float")# execute in batch, commit in 500'sc.execute("INSERT INTO ratings(movieId, rating) VALUES (?,?)", data)# create indexesc.execute("create index ridx on ratings(movieId, rating)")
加载和查询2600万行数据大约需要10分钟的时间(也许我们可以组合调整前两个步骤..)
· 从csv /磁盘加载-35秒
· 插入数据库— 8分钟
· 添加索引— 30秒
· 按查询分组— 20秒
您还可以使用sqlite3命令行来测试和查看查询执行计划,如下所示。 在联接列上添加一些额外的索引后,我们的目标查询大约需要21秒才能执行。
使用SQL DB是可扩展的,但是比较老套。 接下来,我们将尝试行家技巧。
Python —适用于终极黑客
我们可以节省数据库开销,编写数据负载,并在Python中直接繁琐地进行Join:
# double nested loop joindef merge(links, ratings, metas): merged = [] for link in links[1]: mlink = link.copy() mlink += (['','']) # rating and name for rating in ratings[1]: if (mlink[0] == rating[1]): mlink[3] = rating[2] break for meta in metas[1]: if (mlink[1] == meta[6]): # stripped tt off meta imdb columns FYI mlink[4] = meta[20] break merged.append(mlink) return merged
" merge()"是一个没有索引的嵌套循环连接。 循环必须扫描metas和links表以获取每个等级(26m * 50k * 2)。 100k条评论需要5分钟,因此2600万条评论将需要很长时间...
" merge_wmap()"是一个哈希联接-我们为元数据和链接构建了一个Map映射,从而产生了O(n * 1)性能。 加入2600万行只需3秒!
# optimized single loop + w/ hash lookups of ratings and metadatadef merge_wmap(links, ratings, metas) -> []: ratings_map = make_map(ratings[1], 1) metas_map = make_map(metas[1], 6) merged = [] for link in links[1]: mlink = link.copy() + ['',''] mlink[3] = ratings_map.get(link[0])[2] if ratings_map.get(link[0]) else '' mlink[4] = metas_map.get(link[1])[20] if metas_map.get(link[1]) else '' merged.append(mlink) return merged
我没有实现分组过滤器-相对较快(需要26m行结果的排序-扫描-结合)-我估计加载和处理的总时间约为0:53
· 将原始CSV加载到数组-35秒
· 手动合并索引— 3秒
· 手动分组和过滤器-15秒(待定〜估算)
原始Python很快但很丑。 全速本地PC并完全控制所有错误。
Pandas数据框
Pandas [2]是Python上用于数据准备的事实上的软件包。 极其快速且易于使用,我们可以用最少的代码进行加载,连接和分组:
import pandas as pd# load filesratings_df = pd.read_csv('ratings.csv')metas_df = pd.read_csv('movies_metadata.csv')links_df = pd.read_csv('links.csv')# 1st joinmerged_df = pd.merge(links_df[['movieId','imdbId']], ratings_df[['movieId','rating']], on='movieId', how='right', validate='one_to_many')# 2nd joinmerged_df = pd.merge(merged_df, metas_df[['title','imdb_id']], left_on='imdbId', right_on='imdb_id', how='inner')# group-by havinggrouped_df = merged_df[['title','rating']].groupby('title'). agg(Mean=('rating', 'mean' ), Count=('rating','count')). query('Mean > 4.5 and Count > 2')
Pandas很聪明。 您无需预定义哈希或索引,它似乎可以动态生成优化连接所需的内容。 最大的限制是它存在于单个计算机上。 在大约0:17的时间内处理26m行,使用更少的代码,并且没有外部系统(数据库,集群等)。
· 将3个csv加载到DataFrames — 5秒
· 加入3个DataFrames — 8秒
· 加入,分组和过滤-+4秒
Pands文件加载比我自定义的py〜35sec和5sec快得多! 要显示不是黑客,请使用库。 从理论上讲,Pandas是它的单线程/进程(在我的TaskManager中看起来不是这样),因此数据集的大小受PC内存的限制。
尽管如此,Pandas是处理中小型数据的最终方法,但我们需要大数据!
Spark Clusters FTW(致胜)
SQL很棒,但是并行化和破解能力有限。 Python和Pandas超级灵活,但缺乏可伸缩性。 Apache Spark [5]是在大数据上并行化内存中操作的实际方法。
Spark有一个称为DataFrame的对象(是另一个!),它就像Pandas DataFrame一样,甚至可以从中加载/窃取数据(尽管您可能应该通过HDFS或Cloud加载数据,以避免BIG数据传输问题):
from pyspark.sql import SparkSessionfrom pyspark import SparkContext, SparkConf# context config/setup sc = SparkContext(conf=SparkConf().setMaster('local[8]'))spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.execution.arrow.enabled", "true")# copy pandas df's into spark df'st1 = spark.createDataFrame(ratings_df)t2 = spark.createDataFrame(links_df)t3 = spark.createDataFrame(metas_df)# normal spark join (runs on cluster default partioning)df = t1.join(t2, t1['movieId']==t2['movieId']). join(t3, t2['imdbId'] == t3['imbd_id']) # broadcast smaller tables to workerbc_df = t1.join(func.broadcast(t2), t1['movieId']==t2['movieId']). join(func.broadcast(t3), t2['imdbId'] == t3['imbd_id']) # group by resultsdf.groupBy('title').agg(func.mean('rating'). alias('avg_rating'),func.count('rating'). alias('r_count')).filter('r_count >2'). filter('avg_rating > 4.5').show()
我编写了两个Spark连接方法。 两者并行运行。 默认模式(第15行)将对数据进行分区,并在群集节点上随机(分散)数据。 后者的"广播"模式(第18行)仅复制一次较小的表,并且仅分区并发送较大的表内容。 使用较小的联接表,广播模式可以更快。
Spark将工作划分为多个工作节点(JVM(设置为8,以匹配我的CPU内核数)),以划分并征服到一个聚合中。 Spark代码和结果输出如下:
df.groupBy('title').agg(func.mean('rating'). alias('avg_rating'),func.count('rating'). alias('r_count')).filter('r_count >2'). filter('avg_rating > 4.5').show()
性能摘要
(我的笔记本电脑的非实验室认证结果)
首先请注意将3个数据集连接在一起的运行时间:
令人惊讶的是,原始的Python解决方案是最快的吗? 哈克哈克!
顶级分组的最终结果(包括Spark):
总结:
· Pandas具有卓越的快速性和高效性,因此您拥有核心的记忆力。 在某些时候,Python / Pandas将耗尽内存并崩溃。
· 尽管集群管理可能很棘手,但Spark是一个很好的扩展解决方案。 内存中分布式处理,对作业和数据进行分区以及分区存储策略(HDFS或其他)是正确的方向。
· RDBMS可靠,但在移动数据和处理方面有扩展限制
下一章将进一步介绍Spark。 糟糕,我忘记了Dask(原生Python群集),也许是下次。
SQLite3资源配置文件
Pandas资源简介
Spark资源配置文件(8个工作程序,10个分区)
以上数据来自我的MSFT Surface Laptop 3 — i7 / 16gb / 256gb SSD
参考和启示
[0]测试代码的完整来源(不仅仅是要点)-DougFoo的GitHub
[1] SQLite Python指南-官方Python文档
[2]Pandas指南-10分钟教程
[3]较旧的分析SQLite vs Pandas — Wes McKiney博客
[4] Spark加入DB Deck — DataBricks演示
[5]关于Spark的详细介绍-A. Ialenti的TDS文章
[6] PYArrow用于在Spark中快速加载DataFrame — Bryan Cutler IBM
[7]在10分钟内安装PySpark Win — TDS文章作者:乌玛·格(Uma G)
[8]电影评论文件-Kaggle数据集
(本文翻译自Doug Foo的文章《Guide to Big Data Joins — Python, SQL, Pandas, Spark, Dask》,参考:)
标签: #python里的join