前言:
此时看官们对“scala连接sqlserver”大致比较着重,我们都需要学习一些“scala连接sqlserver”的相关知识。那么小编在网络上网罗了一些关于“scala连接sqlserver””的相关文章,希望朋友们能喜欢,朋友们快快来学习一下吧!为什么数据科学家和工程师应该考虑将Spark与Scala结合使用以替代Pandas,以及如何入门
使用Scala从Pandas迁移到Spark并不像您想象的那么困难,因此,您的代码将运行得更快,并且最终可能会编写出更好的代码。
以数据工程师的经验,我发现在Pandas中建立数据管道经常需要我们定期增加资源,以跟上不断增加的内存使用量。 此外,由于意外的数据类型或空值,我们经常会看到许多运行时错误。 通过将Spark与Scala结合使用,解决方案感觉更强大,重构和扩展更容易。
在本文中,我们将介绍以下内容:
· 为什么要在Spark上使用Scala和Pandas
· Scala Spark API与Pandas API的实际区别不大
· 如何开始使用Jupyter笔记本电脑或您喜欢的IDE
什么是Spark?
· Spark是Apache开源框架
· 它可用作库并在"本地"集群上运行,或在Spark集群上运行
· 在Spark集群上,可以以分布式方式执行代码,其中一个主节点和多个工作节点共享负载
· 即使在本地群集上,您仍然可以看到与Pandas相比的性能提升,我们将在下面介绍原因
为什么要使用Spark?
Spark由于能够快速处理大型数据集而变得流行
· 默认情况下,Spark是多线程的,而Pandas是单线程的
· 可以在Spark集群上以分布式方式执行Spark代码,而Pandas可以在一台机器上运行
· Spark是懒惰的,这意味着它只会在您收集时(即,当您实际上需要退还东西时)才执行,同时它会建立执行计划并找到执行代码的最佳方式
· 这与立即执行的Pandas不同,它会在到达每个步骤时执行每个步骤
· Spark也不太可能用完内存,因为当达到内存限制时它将开始使用磁盘
对于运行时的直观比较,请参见Databricks的以下图表,我们可以看到Spark的运行速度明显快于Pandas,并且Pandas的内存不足以较低的阈值运行。
Spark拥有丰富的生态系统
· 数据科学库,例如内置的Spark ML或用于图形算法的Graph X
· Spark Streaming用于实时数据处理
· 与其他系统和文件类型(orc,镶木地板等)的互操作性
为什么要使用Scala而不是PySpark?
Spark提供了熟悉的API,因此使用Scala而不是Python不会感觉像是一个巨大的学习曲线。 以下是一些您可能要使用Scala的原因:
· Scala是一种静态类型的语言,这意味着您会发现代码运行时错误可能比Python少
· Scala还允许您创建不可变的对象,这意味着在引用对象时,您可以确信在创建和调用对象之间其状态没有发生变化。
· Spark是用Scala编写的,因此在Python之前的Scala中可以使用新功能
· 对于数据科学家和数据工程师一起工作,由于Scala代码的类型安全性和不变性,使用Scala可以帮助进行协作。
Spark核心概念
· DataFrame:spark DataFrame是一种与Pandas DataFrame非常相似的数据结构
· Dataset:数据集是类型化的数据框,对于确保数据符合预期的架构非常有用
· RDD:这是Spark中的核心数据结构,在其上构建了DataFrame和Dataset
通常,我们会尽可能使用数据集,因为它们很安全,更有效并且可以提高可读性,因为很明显,我们可以从数据集中获得期望的数据。
数据集 Dataset
要创建数据集,我们首先需要创建一个case类,该类与Python中的数据类相似,并且实际上只是一种指定数据结构的方式。
例如,我们创建一个名为FootballTeam的案例类,其中包含几个字段:
case class FootballTeam( name: String, league: String, matches_played: Int, goals_this_season: Int, top_goal_scorer: String, wins: Int)
现在,让我们创建这个case类的实例:
val brighton: FootballTeam = FootballTeam( "Brighton and Hove Albion", "Premier League", matches_played = 29, goals_this_season = 32, top_goal_scorer = "Neil Maupay", wins = 6 )
让我们创建另一个名为man City的实例,现在我们将创建具有这两个足球队的数据集:
val teams: Dataset[FootballTeam] = spark.createDataset(Seq(brighton, manCity))
另一种方法是:
val teams: Dataset[FootballTeam] = spark.createDataFrame(Seq(brighton, manCity)).as[FootballTeam]
从外部数据源读取数据并返回DataFrame时,第二种方法很有用,因为您可以将其强制转换为Dataset,这样我们现在有了一个类型化的集合。
数据转换
您可以在Pandas DataFrame中应用的大多数(如果不是全部)数据转换在Spark中可用。 当然,语法上会有差异,有时还有其他需要注意的地方,其中一些我们现在将介绍。
总的来说,我发现Spark与Pandas相比在符号上更一致,并且由于Scala是静态类型的,因此您通常可以执行myDataset。 等待编译器告诉您可用的方法!
让我们从一个简单的转换开始,我们只想向我们的数据集添加一个新列,并为其分配常量值。 在Pandas中,这看起来像:
Pandas
df_teams['sport'] = 'football'
除了语法外,Spark还有一个小的区别,那就是在此新字段中添加一个常量值需要我们导入一个名为lit的spark函数。
Spark
import org.apache.spark.sql.functions.litval newTeams = teams.withColumn("sport", lit("football"))
请注意,我们已经创建了一个新对象,因为我们原始的团队数据集是一个val,这意味着它是不可变的。 众所周知,这是一件好事,每当我们使用团队数据集时,我们总是会得到相同的对象。
现在,我们基于函数添加一列。 在Pandas中,它看起来像:
Pandas
def is_prem(league): if league == 'Premier League': return True else: return Falsedf_teams['premier_league'] = df_teams['league'].apply(lambda x: is_prem(x))
为了在Spark中执行相同的操作,我们需要序列化该函数,以便Spark可以应用它。 这可以通过使用UserDefinedFunctions来完成。 我们还使用了大小写匹配,因为这在Scala中比if-else更好,但是两者都可以。
我们还需要导入另一个有用的spark函数col,该函数用于引用列。
Spark
import org.apache.spark.sql.functions.coldef isPrem(league: String): Boolean = league match { case "Premier League" => true case _ => false }val isPremUDF: UserDefinedFunction = udf[Boolean, String](isPrem)val teamsWithLeague: DataFrame = teams.withColumn("premier_league", isPremUDF(col("league")))
现在,我们添加了case类中没有的新列,这会将其转换回DataFrame。 因此,我们需要在原始案例类中添加另一个字段(并使用Options使其可以为空),或创建一个新的案例类。
Scala中的Option仅表示该字段可为空。 如果值是null,则使用None;如果填充,则使用Some(" value")。 可选字符串的示例:
val optionalString : Option[String] = Some("something")
要从中获取字符串,我们可以调用optionalString.get(),这将仅返回" something"。 请注意,如果不确定是否为null,则可以使用optionalString.getOrElse(" nothing"),如果为null,它将返回字符串" nothing"。
过滤数据集是另一个常见要求,这是一个示例,说明Spark比Pandas更一致,因为它遵循与其他转换相同的模式,在该转换中我们执行数据集"点"转换(即,dataset.filter(…) )。
Pandasdf_teams = df_teams[df_teams['goals_this_season'] > 50]Sparkval filteredTeams = teams.filter(col("goals_this_season") > 50)
我们可能需要对数据集执行一些聚合,这在Pandas和Spark中非常相似。
Pandasdf_teams.groupby(['league']).count()Sparkteams.groupBy("league").count()
对于多个聚合,我们可以再次执行类似于Pandas的操作,并具有映射到聚合的字段。 如果我们要进行自己的聚合,则可以使用UserDefinedAggregations。
teams.agg(Map( "matches_played" -> "avg", "goals_this_season" -> "count"))
通常,我们还想结合多个数据集,这可能与并集:
Pandaspd.concat([teams, another_teams], ignore_index=True)Sparkteams.unionByName(anotherTeams)
…或加入:
val players: Dataset[Player] = spark .createDataset(Seq(neilMaupey, sergioAguero))teams.join(players, teams.col("top_goal_scorer") === players.col("player_name"), "left" ).drop("player_name")
在此示例中,我们还使用名为Player的案例类创建了一个新的数据集。 请注意,此案例类有现场伤害,可以为null。
case class Player(player_name: String, goals_scored: Int, injury: Option[String])
请注意,我们删除了player_name列,因为它与top_goal_scorer相同。
我们可能还希望代码的某些部分仅使用Scala本机数据结构,例如Arrays,Lists等。要获得列之一作为Array,我们需要映射到我们的值并调用.collect()。
val teamNames: Array[String] = teams.map(team => team.name) .collect()
请注意,我们可以使用案例类的内置getter返回name字段,如果name不是我们的FootballTeam类中的字段,则不会编译该字段。
顺便说一句,我们也可以将函数添加到case类中,并且在使用诸如IntelliJ或带有Metals插件的vs代码的IDE时,值和函数都将作为自动完成的选项出现。
为了根据数据集是否存在于此数组中来过滤数据集,我们需要通过调用_ *将其视为args序列。
val filteredPlayers: Dataset[Player] = players .filter(col("team").isin(teamNames: _*))运行一些代码
希望在这一点上,您愿意尝试编写一些Spark代码,即使只是为了看看我是否认为它与Pandas差别不大也是如此。
首先,我们有两种选择。 我们可以使用笔记本,这是获取一些数据并开始玩耍的快速方法。 另外,我们可以建立一个简单的项目。 无论哪种方式,您都需要安装Java 8。
Notebooks
在此示例中,我们将在Jupyter笔记本中使用spylon内核。 。 首先运行以下命令来设置您的笔记本,这应该在浏览器中打开您的笔记本。 然后从可用的内核中选择spylon内核。
pip install spylon-kernel python -m spylon_kernel install jupyter notebook
通过将以下内容添加到单元格中,检查我们是否具有正确的Java版本:
!java -version
输出应为:
java version "1.8.0_211"Java(TM) SE Runtime Environment (build 1.8.0_211-b12)Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
如果不是,请在您的bash配置文件中检查JAVA_HOME,并确保它指向Java 8。
下一步是安装一些依赖项。 为此,我们可以将以下代码片段添加到新的单元格中。 这设置了一些spark配置,还允许您添加依赖项。 在这里,我添加了一个名为vegas的可视化库。
%%init_sparklauncher.num_executors = 4launcher.executor_cores = 2launcher.driver_memory = '4g'launcher.conf.set("spark.sql.catalogImplementation", "hive")launcher.packages = ["org.vegas-viz:vegas_2.11:0.3.11", "org.vegas-viz:vegas-spark_2.11:0.3.11"]
要连接到我们的数据源,我们可以定义一个函数,也许像这样:
def getData(file: String): DataFrame = spark.read .format("csv") .option("header", "true") .load(file)
这是与csv文件的连接,但是我们可以连接许多其他数据源。 此函数返回一个DataFrame,我们可能要将其转换为Dataset:
val footballTeams: Dataset[FootballTeam] = getData("footballs_teams.csv").as[FootballTeam]
然后,我们可以开始使用这些数据,并进行我们讨论过的一些数据转换,还有更多。
建立一个项目
现在您已经可以处理一些数据了,您可能想建立一个项目。
要包括的两个主要内容:
· build.sbt-以前我们在一个笔记本单元中添加了依赖项,现在我们需要将它们添加到build.sbt文件中
· SparkSession-在笔记本中我们已经有一个spark会话,这意味着我们能够执行spark.createDataFrame之类的事情。 在我们的项目中,我们需要创建此Spark会话
示例build.sbt:
name := "spark-template"version := "0.1" scalaVersion := "2.12.11"val sparkVersion = "2.4.3"libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersionlibraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
示例SparkSession:
import org.apache.spark.sql.SparkSession trait SparkSessionWrapper { val spark: SparkSession = SparkSession .builder() .master("local") .appName("spark-example") .getOrCreate()}
然后,我们可以使用此包装器扩展对象,这给了我们一个Spark会话。
object RunMyCode extends SparkSessionWrapper { //your code here}
然后,您可以开始编写您的Spark代码!
总而言之,Spark是用于快速数据处理的出色工具,并且在数据世界中越来越流行。 因此,Scala也正变得越来越流行,并且由于其类型安全性,对于可能更熟悉Python和Pandas的数据工程师和数据科学家来说,它是一个不错的选择。 Spark是该语言的出色介绍,因为我们可以使用熟悉的概念(例如DataFrames),因此它并不像一个巨大的学习曲线。
希望这可以给您一个快速的概述,也许使您能够在笔记本或新项目中开始探索Spark。 祝好运!
(本文翻译自Chloe Connor的文章《Stop using Pandas and start using Spark with Scala》,参考:)
标签: #scala连接sqlserver