龙空技术网

动手系列:在Apache Spark和Python中创建您的第一个ETL管道

AI中国 776

前言:

眼前看官们对“spark调用python算法”大体比较注重,姐妹们都需要知道一些“spark调用python算法”的相关知识。那么小编也在网上搜集了一些关于“spark调用python算法””的相关内容,希望同学们能喜欢,同学们一起来学习一下吧!

点击上方关注,All in AI中国

在这篇文章中,我将讨论Apache Spark以及如何在其中创建简单但强大的ETL管道。您将了解Spark如何提供API以将不同的数据格式转换为数据帧和SQL以进行分析,以及如何将一个数据源转换为另一个数据源。

什么是Apache Spark?

根据维基百科:

Apache Spark是一个开源的分布式通用集群计算框架。 Spark提供了一个接口,用于使用隐式数据并行和容错来编程整个集群。

官方网站:

Apache Spark是用于大规模数据处理的统一分析引擎。

简而言之,Apache Spark是一个用于处理、查询和分析大数据的框架。由于计算是在内存中完成的,因此它比MapReduce等竞争对手要快好几倍。以每天产生数TB的数据的速率,需要一种能够以高速提供实时分析的解决方案。一些Spark功能是:

它比传统的大规模数据处理框架快100倍。易于使用,因为您可以在Python、R和Scala中编写Spark应用程序。它为SQL、Steaming和Graph计算提供了库。

Apache Spark组件

Spark 核心

它包含Spark的基本功能,如任务调度、内存管理,与存储的交互等。

Spark SQL

它是一组用于与结构化数据交互的库。它使用类似SQL的界面与各种格式的数据进行交互,如CSV、JSON、Parquet等。

Spark Streaming

Spark Streaming是一个Spark组件,支持处理实时数据流。实时流,如股票数据、天气数据、日志和其他各种。

MLIB

MLib是Spark提供的一套机器学习算法,用于监督和无监督学习

GraphX

它是Apache Spark用于图形和图形并行计算的API。它扩展了Spark RDD API,允许我们创建一个带有附加到每个顶点和边缘的任意属性的有向图。它为ETL、探索性分析和迭代图计算提供了统一的工具。

Spark集群管理器

Spark支持以下资源/集群管理器:

Spark Standalone - Spark附带的简单集群管理器Apache Mesos - 一个也可以运行Hadoop应用程序的通用集群管理器。Apache Hadoop YARN - Hadoop 2中的资源管理器Kubernetes - 一个开源系统,用于自动化容器化应用程序的部署、扩展和管理。

设置和安装

从这里下载Apache Spark的二进制文件。您必须在系统上安装Scala,并且还应设置其路径。

对于本教程,我们使用的是在2019年5月发布的2.4.3版。将文件夹移到/ usr / local中

mv spark-2.4.3-bin-hadoop2.7 / usr / local / spark

然后导出Scala和Spark的路径。

#Scala Pathexport PATH="/usr/local/scala/bin:$PATH"#Apache Spark pathexport PATH="/usr/local/spark/bin:$PATH"

通过在终端上运行spark-shell命令来调用Spark Shell。如果一切顺利,你会看到如下所示:

它加载基于Scala的shell。由于我们将使用Python语言,因此我们必须安装PySpark。

pip install pyspark

安装完成后,您可以通过在终端中运行命令pyspark来调用它:

您找到了一个典型的Python shell,但它加载了Spark库。

用Python开发

让我们开始编写我们的第一个程序。

from pyspark.sql import SparkSessionfrom pyspark.sql import SQLContextif __name__ == '__main__': scSpark = SparkSession \ .builder \ .appName("reading csv") \ .getOrCreate()

我们导入了两个库:SparkSession和SQLContext。

SparkSession是编写Spark应用程序的入口点。它允许您与Spark提供的DataSet和DataFrame API进行交互。我们通过调用appName来设置应用程序名称。 getOrCreate()方法返回应用程序的新SparkSession或返回现有的SparkSession。

我们的下一个目标是读取CSV文件。我创建了一个示例CSV文件,名为data.csv,如下所示:

name,age,countryadnan,40,Pakistanmaaz,9,Pakistanmusab,4,Pakistanayesha,32,Pakistan

和代码:

if __name__ == '__main__': scSpark = SparkSession \ .builder \ .appName("reading csv") \ .getOrCreate()data_file = '/Development/PetProjects/LearningSpark/data.csv' sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache() print('Total Records = {}'.format(sdfData.count())) sdfData.show()

我设置了文件路径,然后调用.read.csv来读取CSV文件。参数是不言自明的。 .cache()缓存返回resultset,从而提高性能。当我运行该程序时,它返回如下所示的内容:

看起来有趣,不是吗?现在,如果我想读取数据帧中的多个文件,该怎么办?让我们创建另一个文件,我将其称为data1.csv,它如下所示:

1

2

3

4

姓名,年龄,国家

诺琳,23,英格兰

阿米尔,9,巴基斯坦

诺曼,4,巴基斯坦

拉希德,12,巴基斯坦

我只需要这样做:

data_file ='/Development/PetProjects/LearningSpark/data*.csv'它将读取所有以CSV类型的数据开头的文件。

它将如何读取与模式和转储结果匹配的所有CSV文件:

如您所见,它将CSV中的所有数据转储到单个数据帧中。

但有一点,只有当所有CSV都遵循某种模式时,这种转储才有效。如果您有一个具有不同列名的CSV,那么它将返回以下消息。

19/06/04 18:59:05 WARN CSVDataSource: Number of column in CSV header is not equal to number of fields in the schema: Header length: 3, schema size: 17CSV file: 

如您所见,Spark抱怨不能处理不同的CSV文件。

您可以使用DataFrame执行许多操作,但Spark为您提供了更简单、更熟悉的接口来使用SQLContext操作数据。它是SparkSQL的网关,它允许您使用类似SQL的查询来获得所需的结果。

在我们进一步发展之前,让我们先玩一些真实的数据。为此,我们使用的是从Kaggle得到的超市销售数据。在我们尝试SQL查询之前,让我们尝试按性别对记录进行分组。我们正在处理ETL的提取部分。

data_file = '/Development/PetProjects/LearningSpark/supermarket_sales.csv'sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()gender = sdfData.groupBy('Gender').count()print(gender.show())

当你运行时,它会返回如下内容:

groupBy()按给定列对数据进行分组。在我们的例子中,它是性别列。

SparkSQL允许您使用类似SQL的查询来访问数据。

sdfData.registerTempTable("sales")output = scSpark.sql('SELECT * from sales')output.show()

首先,我们从dataframe中创建一个临时表。为此,使用了registerTampTable。在我们的例子中,表名是sales。完成后,您可以在其上使用典型的SQL查询。在我们的例子中,它是Select * from sales。

或者类似下面的内容:

output = scSpark.sql('SELECT * from sales WHERE `Unit Price` < 15 AND Quantity < 10')output.show()

甚至是聚合值。

output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City')output.show()

非常灵活,对吗?

我们刚刚完成了ETL的变换部分。

最后是ETL的加载部分。如果要保存转换后的数据怎么办?您会有很多的可用的选项,RDBMS、XML或JSON。

output.write.format('json').save('filtered.json')

运行时,Sparks会创建以下文件夹/文件结构。

它创建了一个具有文件名称的文件夹,在我们的例子中是filtered.json。然后,名为SUCCESStells的文件是否成功运行。如果失败,则生成名为FAILURE的文件。然后,您在此处找到多个文件。多个文件的原因是每个工作都涉及在文件中写入的操作。如果要创建单个文件(不建议使用),则可以使用合并来收集所有分区中的数据并将其减少到单个数据帧。

output.coalesce(1).write.format('json').save('filtered.json')

它将输出以下数据:

{ “总”:328, “城市”: “内比都”}{ “总”:332, “城市”: “曼德勒”}{ “总”:340, “城市”: “仰光”}

MySQL和Apache Spark集成

上述数据帧包含转换后的数据。我们希望将这些数据加载到MYSQL中,以便进一步使用,例如可视化或显示在应用程序上。

首先,我们需要MySQL连接器库来与Spark进行交互。我们将从MySQL网站下载连接器并将其放在一个文件夹中。我们将修改SparkSession以包含JAR文件。

scSpark = SparkSession \ .builder \ .appName("reading csv") \ .config("spark.driver.extraClassPath", "/usr/local/spark/jars/mysql-connector-java-8.0.16.jar") \ .getOrCreate()

输出现在如下所示:

output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City') output.show() output.write.format('jdbc').options( url='jdbc:mysql://localhost/spark', driver='com.mysql.cj.jdbc.Driver', dbtable='city_info', user='root', password='root').mode('append').save()

在运行脚本之前,我在数据库中创建了所需的Db和表。如果一切顺利,您应该看到如下结果:

如您所见,Spark可以更轻松地将数据从一个数据源传输到另一个数据源。

结论

Apache Spark是一个非常苛刻且有用的大数据工具,可以帮助您轻松编写ETL。 您可以加载pb级的数据,并且可以通过设置多个节点的集群轻松地处理这些数据。本教程只是为您提供Apache Spark编写ETL的基本思想。 您应该检查文档和其他资源以深入挖掘。

标签: #spark调用python算法 #etl教程 #apache怎么处理python #python332安装教程