龙空技术网

如何使用Pandas和Dask在Python中处理大型数据集

闻数起舞 793

前言:

今天咱们对“pandas创建空表”大致比较关心,我们都想要学习一些“pandas创建空表”的相关知识。那么小编同时在网络上收集了一些有关“pandas创建空表””的相关知识,希望看官们能喜欢,小伙伴们快快来学习一下吧!

如何使用Pandas和Dask一起处理大型数据集以进行并行计算-以及何时将更大的问题交给SQL。

TL; DR

Python数据科学家经常使用Pandas处理表。 虽然Pandas非常适合中小型数据集,但较大的数据集存在问题。 在本文中,我将展示如何使用Pandas和Dask来处理大型数据集以进行并行计算-以及在所有其他方法均失败的情况下何时将更大的问题交给SQL来处理。

Photo by Debbie Molle on Unsplash

在大型数据集上使用Pandas

Pandas是使用数据表的绝佳库。 与R生态系统类似,它的数据框为数据分析提供了非常强大的工作流。 它的速度相当快,功能丰富且有据可查。 实际上,它已经成为遵循Python方式的数据科学家使用的基本工具。

但是,在使用Python而不是R的数据科学家的生活中,总是会出现笔记本电脑发脾气,拒绝进行更多工作并冻结的现象。

尽管如此,Pandas在执行计算时通过将数据集保存在RAM中来实现其速度。 这就是为什么它有一定的限制(当然,这也取决于机器规格)。

为什么会这样呢?

该问题通常源于在过于复杂的转换过程中数据帧的意外扩展,或者从文件中盲目导入表。 那可能非常令人沮丧。

那么我们该如何解决呢?

一种解决方案是将数据限制为较小的子集,例如,通过探测源中的第n个值。 但是通常这还不够。

但是,如果不能选择限制数据怎么办?

这是否意味着我们不能对大型数据集使用Pandas?

幸运的是,有一种方法可以解决这个问题。

最常见的解决方案是将Pandas与其他解决方案一起使用-例如关系SQL数据库,MongoDB,ElasticSearch或类似的东西。 在Sunscrapers,我们绝对同意这种方法。

但是您有时可以使用Pandas和另一个方便的开源Python库Dask处理Python中大于内存的数据集。

Dask是一个强大的Python库,用于执行分布式和并行计算。 它还提供了用于动态调度Python定义的任务的工具(类似于Apache Airflow)。 它与NumPy紧密集成,并为Pandas提供了等效于数据帧的结构(dask.dataframes),该结构基于延迟加载,可用于按块并行执行数据帧操作。 它还通过在后台执行数据洗牌自动支持分组。

本文概述了一些方便的提示和技巧,以帮助开发人员在使用Python处理大型数据集时减轻一些麻烦。

这是数据集

为了展示Pandas / Dask的功能,我选择从Wikipedia中选择一个有关网站访问者来源的开源数据集。 您可以在此处获取"点击流"表(在.tsv中)。

点击流数据包含4个主要列:

"Prev"-访问者来自的网站(我将其重命名为" coming_from")" curr"-目标文章页面(重命名为" article")"type"-此列描述了引荐类型,例如,外部链接(我将其重命名为" referral_type")“n” –访问次数

接下来,我提出一些问题来处理我的数据集,并检查Pandas和Dask的组合是否能完成工作:

人们在给定文章中最常点击哪些链接?用户从所有外部搜索引擎访问的最受欢迎的文章是什么?给定文章页面的访问者单击链接到那里的百分比?每篇文章最常访问的来源是什么? (显示在单个表中)

数据集大小为1.4 Gb,因此存在很大的内存过载风险。 因此,我将研究分为两个部分。

首先,我仅使用Pandas库对有限的数据子集进行了分析。 然后,我尝试使用Dask对全套数据进行完全相同的操作。

好的,让我们继续分析。

准备数据集

让我们获取数据进行分析:

if [ ! -d “./data” ] then mkdir ./data echo ‘created folder ./data’fi# get the data if not present:if [ ! -f “./data/clickstream_data.tsv” ]; then if [ ! -f “./data/clickstream_data.tsv.gz” ] then wget  -O ./data/clickstream_data.tsv.gz fi gunzip ./data/clickstream_data.tsv.gzfi

现在,让我们看一下这里拥有的数据类型,并将其导入数据框。

此时我们已经可以执行的第一个内存优化步骤(假设我们现在已经知道我们的表结构)是在导入期间指定列数据类型(通过dtype = input参数)。

这样,我们可以强制Pandas将某些值转换为具有显着更低的内存占用量的类型。

如果您要处理几千行,这可能没有多大意义,但几百万行将产生明显的变化!

例如,如果您知道一列应只包含正整数,请使用无符号整数类型(uint32)而不是常规的int类型(或更糟糕的是,float,有时可能会自动发生)。

df = pd.read_csv(‘data/clickstream_data.tsv’,  delimiter=’\t’, names=[‘coming_from’, ‘article’, ‘referrer_type’, ‘n’], dtype={ ‘referrer_type’: ‘category’,  ’n’: ‘uint32’})

最后,为了提高速度,我们将数据帧大小限制为前100k行。

注意,这通常是一个坏主意; 在对子集进行采样时,更适合对第n行进行采样,以获取尽可能统一的采样。 但是,由于我们仅使用它来演示分析过程,因此我们不会打扰:

df = df.iloc[:100000]

问题1:人们在给定文章中最常点击哪些链接?

为了回答这个问题,我们需要创建一个表,在这里我们可以看到每个文章和每个来源的访问者的总和(coming_from列)。

因此,让我们汇总文章上的表格(coming_from列),汇总" n"值,然后根据" n"总和对行进行排序。 这是我们在Pandas中的处理方式:

top_links = df.loc[ df['referrer_type'].isin(['link']),  ['coming_from','article', 'n']]\.groupby([‘coming_from’, ‘article’])\.sum()\.sort_values(by=’n’, ascending=False)

和结果表:

Pandas+Dask

现在,我们使用Dask库重新创建此数据。

from dask import dataframe as dddfd = dd.read_csv( ‘data/clickstream_data.tsv’,  delimiter=’\t’, names=[‘coming_from’, ‘article’, ‘referrer_type’, ‘n’], dtype={ ‘referrer_type’: ‘category’,  ’n’: ‘uint32’}, blocksize=64000000 # = 64 Mb chunks)

请注意,read_csv函数与Pandas函数非常相似,不同之处在于,我们在此处指定每块的字节大小。 我们执行的聚合(aggregation)逻辑也几乎与Pandas相同:

top_links_grouped_dask = dfd.loc[ dfd[‘referrer_type’].isin([‘link’]),  [‘coming_from’,’article’, ‘n’]]\ .groupby([‘coming_from’, ‘article’])

尚无法进行任何计算,top_links_grouped_dask将是Dask延迟的数据框对象。 然后,我们可以启动它以通过.compute()方法进行计算。

但是我们不想阻塞我们的内存,因此我们直接将其保存到硬盘驱动器中。 我们将使用hdf5文件格式来做到这一点。 然后声明hdf5存储:

store = pd.HDFStore('./data/clickstream_store.h5')

并计算其中的数据帧。

请注意,使用Dask排序列值并不是那么容易(毕竟,一次读取一个数据块),因此我们不能像在Pandas示例中那样使用sort_values()方法。

相反,我们需要使用nlargest()Dask方法并指定要确定的最高值的数量:

top_links_dask = top_links_grouped_dask.sum().nlargest(20, 'n')

它也返回一个延迟的Dask对象,因此要最终对其进行计算(并将其保存到Store中),我们运行以下命令:

store.put('top_links_dask', top_links_dask.compute(),  format='table', data_columns=True)

在这种情况下,结果与Pandas示例中的值不同,因为在这里我们处理整个数据集,而不仅仅是前十万行:

问题2:用户从所有外部搜索引擎访问的最受欢迎的文章是什么?

那很容易。 我们需要做的就是过滤出包含"external" referrer_type和"other-search" coming_from值的行:

external_searches = df.loc[ (df[‘referrer_type’].isin([‘external’])) &  (df[‘coming_from’].isin([‘other-search’])), [‘article’, ‘n’]]

然后,我们只需要根据访问者的数量对值进行排序:

most_popular_articles = external_searches.sort_values(  by='n', ascending=False).head(40)

哦耶!

Pandas+Dask

怎么做,但是这次用Dask在完整的数据集上呢?

external_searches_dask = dfd.loc[ (dfd[‘referrer_type’].isin([‘external’])) &  (dfd[‘coming_from’].isin([‘other-search’])), [‘article’, ‘n’]]

由于我们只需要存储前40个结果,因此我们可以直接将它们直接存储在Pandas数据框中:

external_searches_dask = external_searches_dask.nlargest( 40, 'n').compute()

返回此内容(此处仅显示前5行):

这是一个很好的问题,需要图形化地回答,因此让我们绘制前40个最高值:

sns.barplot(data=external_searches_dask, y='article', x='n')plt.gca().set_ylabel('')

问题3:到达指定文章页面的访问者中有多少点击了链接才能到达那里?

这个问题的框架表明我们需要能够计算特定文章标题的分数。 因此,让我们创建一个函数,该函数将使用数据框和所需的文章标题,然后返回百分比值。

该函数将必须过滤给定文章的行,汇总所有访问者计数,然后使用Referrer_type列中的" link"值为访问子集找到n的累积总和:

def visitors_clicked_link_pandas(dataframe, article): df_article = dataframe.loc[dataframe[‘article’].isin([article])] a = df_article[‘n’].sum() l = df_article.loc[ df_article[‘referrer_type’].isin([‘link’]),  ‘n’].sum() return round((l*100)/a, 2)

让我们测试其中一篇文章,说一个标题为" Jehangir_Wadia"的文章:

>>> visitors_clicked_link_pandas(df, 'Jehangir_Wadia')81.1

这表明约有81%的" Jehangir_Wadia"文章访问者通过单击外部链接到达那里。

Pandas+Dask

我们如何使用Dask将其扩展到整个数据集? 很容易。 我们要做的就是使用dask-dataframe而不是Pandas数据框,并将.compute()方法添加到函数中的两个内部语句中,如下所示:

def visitors_clicked_link_dask(dataframe, article): df_article = dataframe.loc[dataframe[‘article’].isin([article])] a = df_article[‘n’].sum().compute() l = df_article.loc[ df_article[‘referrer_type’].isin([‘link’]), ‘n’].sum().compute() return round((l*100)/a, 2)

运行该函数将返回相同的结果:

>>> visitors_clicked_link_dask(dfd, 'Jehangir_Wadia')81.1

问题4:每篇文章最常访问的来源是什么?

要回答这个问题,我们需要两列:一列用于目的地文章和原始标题,以及访问次数的总和。 此外,我们必须过滤出每篇文章访问者数量最多的行。

首先,通过汇总和汇总每种coming_from / article组合对Referrer_type的所有'n'计数来消除所有不必要的额外列:

summed_articles = df.groupby(['article', 'coming_from']).sum()

接下来,让我们找到为每个文章页面吸引最多访问者的引荐来源网址(coming_from)。

一种方法是通过df.iloc []方法使用具有所需行索引的过滤器表。 因此,让我们在summed_articles表中找到与每篇文章的最高" n"相对应的内容。

我们将使用一个名为idxmax的Pandas方法,该方法返回具有最大值的分组列的索引。 这次再次在suming_from列上聚合summed_articles,我们可以这样运行它:

max_n_filter = summed_articles.reset_index()\  .groupby('article')\  .idxmax()

让我们先预览一下过滤器:

现在,我们可以使用此表过滤出summed_articles行:

summed_articles.iloc[max_n_filter['n']].head(4)

最后,我们需要按访问者的最高数量对值进行排序:

summed_articles.iloc[max_n_filter['n']]\  .sort_values(by='n', ascending=False)\  .head(10)

做完了!

Pandas+Dask

现在,让我们尝试在完整数据集的Dask中重新创建此中等复杂的任务。 第一步很容易。 我们可以这样创建带有summed_articles的表,而不会出现任何问题:

summed_articles = dfd.groupby(['article', 'coming_from'])\  .sum()\  .reset_index()\  .compute()

但是最好不要将其存储在内存中-我们稍后必须执行聚合,这将需要大量内存。 因此,让我们直接将其记录下来(作为其计算结果)到硬盘驱动器中,例如hdf5或实木复合地板文件:

dfd.groupby(['article', 'coming_from'])\  .sum()\  .reset_index()\  .to_parquet('./summed_articles.parquet', engine='pyarrow')

到目前为止,一切都很好。

第二步是创建过滤器表。 问题就从这里开始:在撰写本文时,Dask数据框尚无idxmax()实现。 我们必须以某种方式即兴创作。

第一种方法

例如,我们可以将summed_articles索引复制到新列中,并通过自定义apply函数将其输出。 但是,还有另一个问题-数据的快速分区意味着我们无法使用iloc过滤特定的行(所有行都需要使用":"值)。

我们可以尝试使用loc方法并通过检查行的索引是否存在于先前确定的过滤器表列表中来选择行,但这将是巨大的计算开销。 真可惜。

第二种方法

这是另一种方法:我们可以编写一个自定义函数来处理汇总数据,并将其与groupby-apply组合一起使用。 这样,我们可以很轻松地克服上述所有问题。

但是然后…apply方法通过将来自行的单独处理的子集的所有数据输出串联到一个最终表中而起作用,这意味着必须将其临时存储在内存中的一个片段中……

根据数据的运气,它可能足够小,也可能不够小。 我尝试了几次,发现它阻塞了我的(16BG RAM笔记本电脑)内存,迫使笔记本电脑内核最终重新启动。

第三种方法

不遗余力,我尝试通过遍历各个组,找到正确的行并将其附加到磁盘上的hdf5 / parquet存储中来解决方案的阴暗面。

第一个问题:DaskGroupBy对象没有迭代方法的实现(在撰写本文时),因此我们无法使用for-in逻辑。

最后,我们可以从唯一的组合中找到所有article / coming_,并遍历这些值以使用get_group()方法将我们自己的summed_articles行分组:

dfd[[‘article’, ‘coming_from’]]\ .drop_duplicates()\ .to_parquet(‘./uniques.parquet’)for item in pd.read_parquet(‘./uniques.parquet’,  engine=’pyarrow’).itertuples():  t = dfd.groupby([‘article’,‘coming_from’])\ .get_group(item)\ .compute() ...

那应该可以,但是过程会非常慢。 这就是为什么我放弃使用Dask解决此问题的原因。

我要在这里说明的重点是,Pandas并不能(轻松)解决所有面向数据的问题。 当然,一个人可以投资大量的RAM,但是在大多数情况下,这并不是路要走-当然,对于有笔记本电脑的常规数据专家来说,这也不行。

使用良好的旧SQL和关系数据库仍然可以最好地解决这类问题,即使是简单的SQLite也可以在非常合理的时间内执行更好的性能。

我们可以通过几种方式解决此问题。 这是其中之一。

我的解决方案基于将数据存储在PostgreSQL数据库中,并借助PARTITION BY和ROW_NUMBER函数执行复合查询。 我在这里使用PostgreSQL数据库,但是它也可以是最新的SQLite3(3.25版或更高版本),因为它现在按功能支持分区-这就是我们解决方案所需要的。

为了保存结果,我创建了一个新的PostgreSQL数据库" clickstream",该数据库在Docker容器中本地运行,并通过SQLAlchemy接口引擎从Jupyter Notebook连接到它:

import psycopg2from sqlalchemy import create engineengine = create_engine(‘postgres://<db hostname>/clickstream’)conn = psycopg2.connect( dbname=”clickstream”, user=”postgres”,  password=”<secure-password>”, host=”0.0.0.0")cur = conn.cursor()

然后,我们按文章和comeing_from列对Dask数据帧求和,并从制表符和返回字符中清除字符串数据,这会干扰PostgreSQL的上传:

summed_articles = dfd.groupby([‘article’, ‘coming_from’])\ .sum()\ .reset_index()\ .compute()for c in ['\t', '\n', '\\']: summed_articles[‘article’] = \  summed_articles[‘article’].str.replace(c, ‘ ‘)summed_articles[‘coming_from’] = \ summed_articles[‘coming_from’].str.replace(‘\t’, ‘ ‘)

同样,目前我们还没有进行任何编辑,summed_articles仍然是延迟的Dask对象。

将数据帧上载到数据库之前,最后要做的一件事是在现有数据库中创建一个空表,因此发送具有正确列名的空表将可以很好地解决这一问题:

pd.DataFrame(columns=summed_articles.columns).to_sql( ‘summed_articles’,  con=engine,  if_exists=’replace’,  index=False)

最后,让我们将数据上传到其中。 请注意,在编写Dask数据框时,没有提供to_sql方法,因此我们可以使用另一种技巧来快速地逐块执行:

for n in range(summed_articles.npartitions): table_chunk = summed_articles.get_partition(n).compute() output = io.StringIO() table_chunk.to_csv(output, sep=’\t’, header=False, index=False) output.seek(0) try: cur.copy_from(output, ‘summed_articles’, null=””) except Exception: err_tables.append(table_chunk) conn.rollback() continue conn.commit()

接下来,我们创建一个SELECT语句,该语句按文章对行进行划分,并按访问次数列" n"在本地对其进行排序,并以整数(每个分区的子集均从1开始)对已排序的组进行递增索引:

SELECT  row_number() OVER ( PARTITION BY article ORDER BY n DESC ) ArticleNR,  article, coming_from, nFROM article_sum

然后,我们再次根据商品列汇总行,并仅返回索引等于1的行,从本质上过滤出给定文章中具有最大" n"值的行。 这是完整的SQL查询:

SELECT t.article, t.coming_from, t.n FROM ( SELECT row_number() OVER ( PARTITION BY article ORDER BY n DESC ) ArticleNR,  article, coming_from, n FROM article_sum ) tWHERE t.ArticleNR = 1ORDER BY n DESC;

然后通过以下命令对数据库执行上述SQL查询:

q = engine.execute(‘’’<SELECT statement here>’’’).fetchall()pd.DataFrame(q, columns=[‘article’, ‘coming_from’, ‘n’]).head(20)

瞧,我们的表已经准备好了。

此外,显然连字符和减号之间的差异使许多人在2018年晚上醒来:

我希望本指南可以帮助您使用Pandas + Dask组合处理Python中的较大数据集。 很显然,某些复杂的分析任务仍可以通过其他技术(例如良好的旧关系数据库和SQL)来最好地处理。

注意1:在使用Dask时,每个dask-dataframe块以及最终输出(转换为Pandas数据帧)必须足够小以适合内存。

注意2:以下是一些有用的工具,可帮助您密切关注与数据大小有关的问题:

Jupyter Notebook中的%timeit魔术功能df.memory_usage()来自dask.diagnostics的ResourceProfiler进度条sys.getsizeofgc.collect()

(本文翻译自Filip Ciesielski的文章 《How to handle large datasets in Python with Pandas and Dask》)

标签: #pandas创建空表