龙空技术网

停止使用Pandas并开始使用Spark+Scala

闻数起舞 7365

前言:

此时看官们对“scala连接sqlserver”大致比较着重,我们都需要学习一些“scala连接sqlserver”的相关知识。那么小编在网络上网罗了一些关于“scala连接sqlserver””的相关文章,希望朋友们能喜欢,朋友们快快来学习一下吧!

为什么数据科学家和工程师应该考虑将Spark与Scala结合使用以替代Pandas,以及如何入门

> Source:

使用Scala从Pandas迁移到Spark并不像您想象的那么困难,因此,您的代码将运行得更快,并且最终可能会编写出更好的代码。

以数据工程师的经验,我发现在Pandas中建立数据管道经常需要我们定期增加资源,以跟上不断增加的内存使用量。 此外,由于意外的数据类型或空值,我们经常会看到许多运行时错误。 通过将Spark与Scala结合使用,解决方案感觉更强大,重构和扩展更容易。

在本文中,我们将介绍以下内容:

· 为什么要在Spark上使用Scala和Pandas

· Scala Spark API与Pandas API的实际区别不大

· 如何开始使用Jupyter笔记本电脑或您喜欢的IDE

什么是Spark?

· Spark是Apache开源框架

· 它可用作库并在"本地"集群上运行,或在Spark集群上运行

· 在Spark集群上,可以以分布式方式执行代码,其中一个主节点和多个工作节点共享负载

· 即使在本地群集上,您仍然可以看到与Pandas相比的性能提升,我们将在下面介绍原因

为什么要使用Spark?

Spark由于能够快速处理大型数据集而变得流行

· 默认情况下,Spark是多线程的,而Pandas是单线程的

· 可以在Spark集群上以分布式方式执行Spark代码,而Pandas可以在一台机器上运行

· Spark是懒惰的,这意味着它只会在您收集时(即,当您实际上需要退还东西时)才执行,同时它会建立执行计划并找到执行代码的最佳方式

· 这与立即执行的Pandas不同,它会在到达每个步骤时执行每个步骤

· Spark也不太可能用完内存,因为当达到内存限制时它将开始使用磁盘

对于运行时的直观比较,请参见Databricks的以下图表,我们可以看到Spark的运行速度明显快于Pandas,并且Pandas的内存不足以较低的阈值运行。

>

Spark拥有丰富的生态系统

· 数据科学库,例如内置的Spark ML或用于图形算法的Graph X

· Spark Streaming用于实时数据处理

· 与其他系统和文件类型(orc,镶木地板等)的互操作性

为什么要使用Scala而不是PySpark?

Spark提供了熟悉的API,因此使用Scala而不是Python不会感觉像是一个巨大的学习曲线。 以下是一些您可能要使用Scala的原因:

· Scala是一种静态类型的语言,这意味着您会发现代码运行时错误可能比Python少

· Scala还允许您创建不可变的对象,这意味着在引用对象时,您可以确信在创建和调用对象之间其状态没有发生变化。

· Spark是用Scala编写的,因此在Python之前的Scala中可以使用新功能

· 对于数据科学家和数据工程师一起工作,由于Scala代码的类型安全性和不变性,使用Scala可以帮助进行协作。

Spark核心概念

· DataFrame:spark DataFrame是一种与Pandas DataFrame非常相似的数据结构

· Dataset:数据集是类型化的数据框,对于确保数据符合预期的架构非常有用

· RDD:这是Spark中的核心数据结构,在其上构建了DataFrame和Dataset

通常,我们会尽可能使用数据集,因为它们很安全,更有效并且可以提高可读性,因为很明显,我们可以从数据集中获得期望的数据。

数据集 Dataset

要创建数据集,我们首先需要创建一个case类,该类与Python中的数据类相似,并且实际上只是一种指定数据结构的方式。

例如,我们创建一个名为FootballTeam的案例类,其中包含几个字段:

case class FootballTeam(    name: String,    league: String,    matches_played: Int,    goals_this_season: Int,    top_goal_scorer: String,    wins: Int)

现在,让我们创建这个case类的实例:

val brighton: FootballTeam =    FootballTeam(      "Brighton and Hove Albion",      "Premier League",      matches_played = 29,      goals_this_season = 32,      top_goal_scorer = "Neil Maupay",      wins = 6    )

让我们创建另一个名为man City的实例,现在我们将创建具有这两个足球队的数据集:

val teams: Dataset[FootballTeam] = spark.createDataset(Seq(brighton,        manCity))

另一种方法是:

val teams: Dataset[FootballTeam] =       spark.createDataFrame(Seq(brighton, manCity)).as[FootballTeam]

从外部数据源读取数据并返回DataFrame时,第二种方法很有用,因为您可以将其强制转换为Dataset,这样我们现在有了一个类型化的集合。

数据转换

您可以在Pandas DataFrame中应用的大多数(如果不是全部)数据转换在Spark中可用。 当然,语法上会有差异,有时还有其他需要注意的地方,其中一些我们现在将介绍。

总的来说,我发现Spark与Pandas相比在符号上更一致,并且由于Scala是静态类型的,因此您通常可以执行myDataset。 等待编译器告诉您可用的方法!

让我们从一个简单的转换开始,我们只想向我们的数据集添加一个新列,并为其分配常量值。 在Pandas中,这看起来像:

Pandas

df_teams['sport'] = 'football'

除了语法外,Spark还有一个小的区别,那就是在此新字段中添加一个常量值需要我们导入一个名为lit的spark函数。

Spark

import org.apache.spark.sql.functions.litval newTeams = teams.withColumn("sport", lit("football"))

请注意,我们已经创建了一个新对象,因为我们原始的团队数据集是一个val,这意味着它是不可变的。 众所周知,这是一件好事,每当我们使用团队数据集时,我们总是会得到相同的对象。

现在,我们基于函数添加一列。 在Pandas中,它看起来像:

Pandas

def is_prem(league):    if league == 'Premier League':        return True    else:        return Falsedf_teams['premier_league'] = df_teams['league'].apply(lambda x:                                     is_prem(x))

为了在Spark中执行相同的操作,我们需要序列化该函数,以便Spark可以应用它。 这可以通过使用UserDefinedFunctions来完成。 我们还使用了大小写匹配,因为这在Scala中比if-else更好,但是两者都可以。

我们还需要导入另一个有用的spark函数col,该函数用于引用列。

Spark

import org.apache.spark.sql.functions.coldef isPrem(league: String): Boolean =    league match {      case "Premier League" => true      case _                => false    }val isPremUDF: UserDefinedFunction =    udf[Boolean, String](isPrem)val teamsWithLeague: DataFrame = teams.withColumn("premier_league",                                                                      isPremUDF(col("league")))

现在,我们添加了case类中没有的新列,这会将其转换回DataFrame。 因此,我们需要在原始案例类中添加另一个字段(并使用Options使其可以为空),或创建一个新的案例类。

Scala中的Option仅表示该字段可为空。 如果值是null,则使用None;如果填充,则使用Some(" value")。 可选字符串的示例:

val optionalString : Option[String] = Some("something")

要从中获取字符串,我们可以调用optionalString.get(),这将仅返回" something"。 请注意,如果不确定是否为null,则可以使用optionalString.getOrElse(" nothing"),如果为null,它将返回字符串" nothing"。

过滤数据集是另一个常见要求,这是一个示例,说明Spark比Pandas更一致,因为它遵循与其他转换相同的模式,在该转换中我们执行数据集"点"转换(即,dataset.filter(…) )。

Pandasdf_teams = df_teams[df_teams['goals_this_season'] > 50]Sparkval filteredTeams = teams.filter(col("goals_this_season") > 50)

我们可能需要对数据集执行一些聚合,这在Pandas和Spark中非常相似。

Pandasdf_teams.groupby(['league']).count()Sparkteams.groupBy("league").count()

对于多个聚合,我们可以再次执行类似于Pandas的操作,并具有映射到聚合的字段。 如果我们要进行自己的聚合,则可以使用UserDefinedAggregations。

teams.agg(Map( "matches_played" -> "avg", "goals_this_season" -> "count"))

通常,我们还想结合多个数据集,这可能与并集:

Pandaspd.concat([teams, another_teams], ignore_index=True)Sparkteams.unionByName(anotherTeams)

…或加入:

val players: Dataset[Player] = spark        .createDataset(Seq(neilMaupey, sergioAguero))teams.join(players,        teams.col("top_goal_scorer") === players.col("player_name"),        "left"      ).drop("player_name")

在此示例中,我们还使用名为Player的案例类创建了一个新的数据集。 请注意,此案例类有现场伤害,可以为null。

case class Player(player_name: String, goals_scored: Int, injury: Option[String])

请注意,我们删除了player_name列,因为它与top_goal_scorer相同。

我们可能还希望代码的某些部分仅使用Scala本机数据结构,例如Arrays,Lists等。要获得列之一作为Array,我们需要映射到我们的值并调用.collect()。

val teamNames: Array[String] = teams.map(team => team.name) .collect()

请注意,我们可以使用案例类的内置getter返回name字段,如果name不是我们的FootballTeam类中的字段,则不会编译该字段。

顺便说一句,我们也可以将函数添加到case类中,并且在使用诸如IntelliJ或带有Metals插件的vs代码的IDE时,值和函数都将作为自动完成的选项出现。

为了根据数据集是否存在于此数组中来过滤数据集,我们需要通过调用_ *将其视为args序列。

val filteredPlayers: Dataset[Player] = players .filter(col("team").isin(teamNames: _*))
运行一些代码

希望在这一点上,您愿意尝试编写一些Spark代码,即使只是为了看看我是否认为它与Pandas差别不大也是如此。

首先,我们有两种选择。 我们可以使用笔记本,这是获取一些数据并开始玩耍的快速方法。 另外,我们可以建立一个简单的项目。 无论哪种方式,您都需要安装Java 8。

Notebooks

在此示例中,我们将在Jupyter笔记本中使用spylon内核。 。 首先运行以下命令来设置您的笔记本,这应该在浏览器中打开您的笔记本。 然后从可用的内核中选择spylon内核。

 pip install spylon-kernel python -m spylon_kernel install jupyter notebook

通过将以下内容添加到单元格中,检查我们是否具有正确的Java版本:

!java -version

输出应为:

java version "1.8.0_211"Java(TM) SE Runtime Environment (build 1.8.0_211-b12)Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

如果不是,请在您的bash配置文件中检查JAVA_HOME,并确保它指向Java 8。

下一步是安装一些依赖项。 为此,我们可以将以下代码片段添加到新的单元格中。 这设置了一些spark配置,还允许您添加依赖项。 在这里,我添加了一个名为vegas的可视化库。

%%init_sparklauncher.num_executors = 4launcher.executor_cores = 2launcher.driver_memory = '4g'launcher.conf.set("spark.sql.catalogImplementation", "hive")launcher.packages = ["org.vegas-viz:vegas_2.11:0.3.11",                    "org.vegas-viz:vegas-spark_2.11:0.3.11"]

要连接到我们的数据源,我们可以定义一个函数,也许像这样:

def getData(file: String): DataFrame =         spark.read        .format("csv")        .option("header", "true")        .load(file)

这是与csv文件的连接,但是我们可以连接许多其他数据源。 此函数返回一个DataFrame,我们可能要将其转换为Dataset:

val footballTeams: Dataset[FootballTeam] =   getData("footballs_teams.csv").as[FootballTeam]

然后,我们可以开始使用这些数据,并进行我们讨论过的一些数据转换,还有更多。

建立一个项目

现在您已经可以处理一些数据了,您可能想建立一个项目。

要包括的两个主要内容:

· build.sbt-以前我们在一个笔记本单元中添加了依赖项,现在我们需要将它们添加到build.sbt文件中

· SparkSession-在笔记本中我们已经有一个spark会话,这意味着我们能够执行spark.createDataFrame之类的事情。 在我们的项目中,我们需要创建此Spark会话

示例build.sbt:

name := "spark-template"version := "0.1" scalaVersion := "2.12.11"val sparkVersion = "2.4.3"libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersionlibraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

示例SparkSession:

import org.apache.spark.sql.SparkSession trait SparkSessionWrapper {       val spark: SparkSession = SparkSession           .builder()           .master("local")           .appName("spark-example")           .getOrCreate()}

然后,我们可以使用此包装器扩展对象,这给了我们一个Spark会话。

object RunMyCode extends SparkSessionWrapper { //your code here}

然后,您可以开始编写您的Spark代码!

总而言之,Spark是用于快速数据处理的出色工具,并且在数据世界中越来越流行。 因此,Scala也正变得越来越流行,并且由于其类型安全性,对于可能更熟悉Python和Pandas的数据工程师和数据科学家来说,它是一个不错的选择。 Spark是该语言的出色介绍,因为我们可以使用熟悉的概念(例如DataFrames),因此它并不像一个巨大的学习曲线。

希望这可以给您一个快速的概述,也许使您能够在笔记本或新项目中开始探索Spark。 祝好运!

(本文翻译自Chloe Connor的文章《Stop using Pandas and start using Spark with Scala》,参考:)

标签: #scala连接sqlserver