龙空技术网

在PySpark中使用自定义UDF来计算Haversine距离

林小婵的店 228

前言:

目前各位老铁们对“sparkpythonudf函数”可能比较关怀,兄弟们都需要学习一些“sparkpythonudf函数”的相关文章。那么小编同时在网络上汇集了一些有关“sparkpythonudf函数””的相关资讯,希望咱们能喜欢,看官们一起来学习一下吧!

在处理全局位置元数据时,计算坐标点之间的地理距离通常很有用。困难在于在计算距离时必须考虑到地球的曲率。

在本文中,您将了解如何使用Spark会话中的Haversine公式作为自定义用户定义函数计算地理距离。

数据集

在本例中,我们将使用全球历史气候网络(GHCN)每日观测数据集 。它是历史天气数据的数据集,涵盖了180个不同地区的100,000多个不同气象站的263年观测资料。有26.2亿观测值,构成了13.4GB的压缩数据。你可以从这里得到这个()。

提供的脚本用于Spark会话,具有8个内核,4个执行程序,4Gb主内存和4Gb工作内存。

用Haversine计算地理距离

Haversine是一个公式,它采用两个坐标点(纬度和经度)并在一个对象上生成第三个坐标点,以计算两个原始点之间的表面距离,同时考虑对象的曲率。它假定物体的形状(在我们的例子中是地球)是一个球体。我们知道地球实际上并不是一个完美的球体,但半径是一个常数可以给出相当准确的结果。

在本例中,我们将生成包含新西兰所有电台的haversine距离的元数据。我们将选择新西兰。

首先,我们需要导入必须的Python库。

from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql import functions as F from pyspark.sql import DataFrameWriter as W from math import radians, cos, sin, asin, sqrt spark = (SparkSession.builder .appName(‘’HDFS_Haversine_Fun”) .getOrCreate())

接下来,我们需要加载所需的数据。对于此示例,您只需要站点元数据。但是稍后当您想要进行自己的分析时,您可能希望使用所有GHCN文件。Python代码如下:

input_stations = (spark.read.format(“text”) .load(“hdfs:///data/ghcnd/stations”)) stations_df = input_stations.select( F.trim(F.substring(F.col(“value”), 1 , 11–1 + 1 )).alias(“STATION_ID”).cast(StringType()), F.trim(F.substring(F.col(“value”), 13, 20–13 + 1)).alias(“LATITUDE”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 22, 30–22 + 1)).alias(“LONGITUDE”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 32, 37–32 + 1)).alias(“ELEVATION”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 39, 40–39 + 1)).alias(“STATE_CODE”).cast(StringType()), F.trim(F.substring(F.col(“value”), 42, 71–42 + 1)).alias(“STATION_NAME”).cast(StringType()), F.trim(F.substring(F.col(“value”), 73, 75–73 + 1)).alias(“GSNFLAG”).cast(StringType()), F.trim(F.substring(F.col(“value”), 77, 79–77 + 1)).alias(“HCNFLAG_CRNFLAG”).cast(StringType()), F.trim(F.substring(F.col(“value”), 81, 85–81 + 1)).alias(“WMOID”).cast(StringType()) )

请注意,您不需要定义Schema,然后将其传递到单独的load语句中,因为您可以使用pyspark.sql.functions将数据解析为具有所需类型的新列。实际上,这比在文本文件中首先定义StructFields要快得多。

让我们来看看stations_df的前五个观察结果。

stations_df.show(5)

接下来,我们将添加一个名为COUNTRY_CODE的列,以便我们稍后可以过滤以仅访问我们最喜欢的国家/地区中的电台。我们应该总是将udf传递给实现结果所需的最少量信息,因为udfs在大型数据集上的计算成本往往很高。

我们可以看到STATION_ID的前两个字符是国家代码。所以我们只需要取两个前两个字符并将其添加为新列COUNTRY_CODE。

stations_df = stations_df.withColumn(‘COUNTRY_CODE’, stations_df.STATION_ID.substr(1, 2))

完成后,我们可以过滤并选择所需的列。

nz_stations = (stations_df.filter(stations_df.COUNTRY_CODE==”NZ”).select(“STATION_ID”,“STATION_NAME”,“LATITUDE”, “LONGITUDE”))

现在我们可以开始Haversine函数了!为此,我们创建了一个标准的python函数,其中我们使用地球的半径为6371km,并返回distance rounded的绝对值为2dp。Python代码如下:

def get_distance(longit_a, latit_b, longit_b, latit_b): # Transform to radians longit_a, latit_b, longit_b, latit_b = map(radians, [longit_a, latit_b, longit_b, latit_b]) dist_longit = longit_b — longit_a dist_latit = latit_b — latit_a # Calculate area area = sin(dist_latit/2)**2 + cos(latit_a) * sin(dist_longit/2)**2 # Calculate the central angle central_angle = 2 * asin(sqrt(area)) radius = 6371 # Calculate Distance distance = central_angle * radius return abs(round(distance, 2))

现在我们已经将它定义为python函数,我们可以创建一个用户定义的函数来在Spark DataFrame上使用它。用户定义函数允许我们在python或SQL中创建自定义函数,然后使用它们来操作Spark DataFrame中的列。

转换为UDF:

udf_get_distance = F.udf(get_distance)

现在,我们需要获取原始的nz_stations元数据,并将其与自身交叉连接,以支持列操作,并在过程中重命名列。

nz_station_pairs = (nz_stations.crossJoin(nz_stations).toDF(“STATION_ID_A”, “STATION_NAME_A”, “LATITUDE_A”, “LONGITUDE_A”, “STATION_ID_B”, “STATION_NAME_A”, “LATITUDE_B”, “LONGITUDE_B”))

然后通过删除重复的行来清理它。

nz_station_pairs = (nz_station_pairs.filter(nz_station_pairs.STATION_ID_A != nz_station_pairs.STATION_ID_B))

现在我们可以将我们的udf_get_distance函数应用于nz_station_pairs以添加新列ABS_DISTANCE。

nz_pairs_distance = (nz_station_pairs.withColumn(“ABS_DISTANCE”, udf_get_distance( nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A, nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B)))

注意:在这种情况下,udf不会将输出作为float返回,而是创建一个新的列,distances 为字符串。我想这是因为withColumn恢复了Spark的默认值,这是一个字符串。

如果您遇到此问题,则可以通过添加新列来修改此值,该列是前一个的复制,但将新列强制转换为Double Type。您无法更改上一列,因为Spark DataFrames是不可变的。

nz_pairs_distance = nz_pairs_distance.withColumn(“DISTANCE_FLOAT”, nz_pairs_distance.ABS_DISTANCE.cast(DoubleType()))

现在您的分析中包含了haversine距离元数据!

nz_pairs_distance.show(7)

您现在可以将其写入本地hfs目录,以用于R或Python中的可视化。

W(nz_pairs_distance).csv(path=”hdfs:///YOUR_DIRECTORY”, mode=’ignore’, header=’true’)
建议确保在使用UDF时始终使用实现结果所需的最小数据。这是因为如果UDF包含多个列操作,则作业将包含许多任务。始终检查原始函数返回的所需对象数据类型是否与UDF的对象数据类型一致。您可以通过将新变量插入终端来轻松地在Spark中进行检查。您可以使用Spark的DataFrameWriter将特定元数据保存到HDFS目录,然后用于使用其他工具(例如R,Python等)进行分析。这可以通过以下方式复制到local director :
hdfs dfs -copyToLocal hdfs:///user/YOUR_DIRECTORY/YOUR_FILE.csv

标签: #sparkpythonudf函数