龙空技术网

spark ML 机器学习库:评估器,转换器和管道概述

二十岁背包 327

前言:

现时大家对“spark ml 算法”大概比较关切,姐妹们都想要学习一些“spark ml 算法”的相关资讯。那么小编同时在网摘上网罗了一些有关“spark ml 算法””的相关资讯,希望你们能喜欢,姐妹们快快来了解一下吧!

spark.ml包目标是提供统一的高级别的API,这些高级API建立在DataFrame上,DataFrame帮助用户创建和调整实用的机器学习管道。在下面spark.ml子包指导中查看的算法指导部分,包含管道API独有的特征转换器,集合等。

内容表:

Main concepts in Pipelines (管道中的主要概念)DataFramePipeline components (管道组件)Transformers (转换器)Estimators (评估器)Properties of pipeline components (管道组件的属性)Pipeline (管道)How it works (怎样工作)Details (明细)Parameters (参数)Saving and Loading Pipelines (保存和加载管道)Code examples (实例代码)Example: Estimator, Transformer, and Param (例:Estimator, Transformer, and Param )Example: Pipeline(例:管道)Example: model selection via cross-validation (例:通过交叉校验选择模型)Example: model selection via train validation split(例:通过训练检验分离选择模型)

Main concepts in Pipelines (管道中的主要概念)

对于机器学习算法 ,spark ML标准化API 很容易联合多个算法到同一个管道中,或者工作流中。这部分介绍spark ML API引入的主要的概念。管道概念是从scikit-learn中得到的很多启发。

DataFrame:spark ML从spark sql中获取DataFrame作为学习的数据集。可以持有多种数据类型 ,例如:有个df可以包含文本,特征向量,true标签和预测结果的多个不同的列。转换器: 转换器是把一个DF转成另外一个DF的算法,例:一个ML模型是一个转换器 ,转换带有特征的DataFrame成为带有预测结果的DF.评估器: 评估器是应用(fit)在一个DF上生成一个转换器的算法。 例:学习算法是一个评估器,在DF上训练并且生成一个模型。管道: 一个管道连接多个转换器和评估器在一起,作为一个工作流。参数: 对于指定的参数来说,所有的转换器和评估器 共享一个公共的api.

DataFrame

机器学习可以应用于很多的数据类型上,如:向量,文本,图片,结构数据。ML采用DF就是为了支持多种数据类型。

df支持很多基本和结构化的类型;支持spark sql里的类型,还有向量。

DF可以显性或隐性的从常规的RDD创建。 详见代码

df中的列已经命名了。下面的实例代码中使用的名称 如 “text,” “features,” 和“label.”

管道组件

转换器

转换器是包含特征转换器和学习模型的抽象。技术上,一个转换器执行一个transform()方法转换DF.一般是增加一个或多个列。例:

一个特征转换器拿到一个DF,读取一列映射到一个新列上。 然后输出一个包含映射列的新的DF.一个学习模型拿到一个DF.读取包含特征向量的列,为每个特征向量预测标签, 然后把预测标签作为一个新列放到DF中输出。

评估器

评估器抽象一个应用或训练在数据上的算法的概念。技术上, 评估器调用fit(df)方法。 方法接收一个DF然后产生一个模型,这个模型就是转换器。例: 一个学习算法如逻辑回归是一个Estimators,调用fit()方法训练LogisticRegressionModel,它是一个模型,因此是一个transformer。

管道组件的属性

Transformer.transform()s and Estimator.fit()s 都是无状态的, 未来,有状态算法可以通过替代概念来支持。

每一个Transformer or Estimator的实例都有一个唯一id, 在指定参数时很有用(下面讨论)。

管道

在机器学习中, 通常会运行一系列的算法去处理和学习数据。例: 一个简单的文本文档处理工作流可以包含一些stage:

把文档中的文本分成单词。转换文档中的每个单词成为数字化的特征向量使用特征向量和标签学习预测模型。

spark ML 描述一个工作流作为一个管道。管道由一系列的PipelineStage(Transformers 和Estimators)组成,以指定的顺序运行。在这部分中我们将使用这个简单的工作流作为运行的例子

How it works

一个管道被指定为一系列的阶段,每一阶段要么是Transformer 或是Estimator。这些stage有序运行,每个stage转换输入的。对于Transformer 阶段, transform() 方法在df调用,对于Estimator 阶段,fit()方法被调用产生一个新的Transformer( 转换器变成PipelineModel的一部分,或合适的管道),然后转换器调用transform() 方法应用在df上。

我们通过一个简单文本文档工作流来阐明, 下面的图是关于管道的训练time(时代)的用法。

上面,顶行代表包含三个阶段的管道。前两个蓝色的 (Tokenizer and HashingTF) 是转换器,第三个LogisticRegression是评估器。底部的行代表通过管道的数据流,圆筒是DF。Pipeline.fit() 方法在初始的df上调用,df是行文本和标签。Tokenizer.transform() 方法把行文本分成单词,增加一个包含单词的新列到DF上。HashingTF.transform()方法转换单词列成特征向量,增加一个包含向量的新列到DF上。现在,LogisticRegression是评估器,管道首先调用 LogisticRegression.fit()生成LogisticRegressionModel。如果管道有多个阶段,在传递df给下一阶段之前它调用LogisticRegressionModel的 transform()方法。

一个管道是一个评估器,因此,管道的fit()方法运行之后,产生一个PipelineModel ,它是一个转换器。这个PipelineModel使用在校验阶段(test time); 下图说明了PipelineModel的用法。

在上图里, PipelineModel 与原管道的stage的数量相同。但是在原始管道中所有的评估器都变成转换器。当在测试数据集上调用 PipelineModel’s transform() 时,数据有序的通过合适的管道。每一个stage的transform()更新数据集,然后传给下一个stage。

Pipeline 和 PipelineModel 有助于确保训练集和测试集得到相同的特征处理步骤。

明细

DAG管道: 一个管道里的多个stage被指定为一个有序的数组,上面的例子给的是线性管道的例子,管道里的stage使用的数据是它上一个stage产生的。也可以创建非线性的管道,只要数据流图是一个DAG图。DAG图可以基于每个stage 输入和输出的列名隐式的指定。如果管道是DAG的形式,然后stage的拓扑图顺序必须被指定。

运行时检测:由于管道可以操作多类型的DataFrame,所以不能执行编译时类型检测。Pipelines 和PipelineModel在运行时检测。使用DataFrame的schema来做检测。

独一无二的管道 stage:管道的stage应该是独一无二实例 ,举例:同一个myHashing实例不能插入管道两次,因为管道必须有唯一id。然而,实例两个实例(myHashingTF1 and myHashingTF2)可以放到同一个管道中, 因为不同的实例产生不同的id

参数

ML的 Estimator和Transformer 使用相同的api来指定参数。

参数名Param是一个参数, ParamMap是一个参数的集合 (parameter, value)

给算法传参的有两种主要的方法:

1.为实例设置参数,例:如果lr是LogisticRegression的实例。设置 lr.setMaxIter(10),可以让 lr.fit()最多十次迭代。这个API整合了spark.mllib包里的api。

2.传递ParamMap给fit() or transform()方法, ParamMap里的参数将覆盖前面通过setter方法设定的参数。

参数属于指定的Estimators and Transformer实例 ,例:如果有两个逻辑回归lr1 and lr2,然后可以创建包含两个maxIter参数的ParamMap:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果在一个管道中两个算法都有maxIter参数时,这很有用。

保存和加载管道

保存一个模型和一个管道到磁盘,为了下次使用是很有价值的。 在spark 1.6里。一个模型导入、导出的功能被加到管道的API里。大多数基本的transformer被支持,同样,一些基本的学习模型也被支持。请参考算法api文档看看是否支持保存和加载。

事例代码:

这部分给一些代码来展示上面讨论的功能。更多信息请看API. 一些spark ML算法是对spark.mllib算法的包装。具体明细请看MLlib programming guide。

Example: Estimator, Transformer, and Param

这部分涵盖 Estimator,Transformer,和Param

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.param.ParamMap

import org.apache.spark.mllib.linalg.{Vector, Vectors}

import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.

//准备带标签和特征的数据

val training = sqlContext.createDataFrame(Seq(

(1.0, Vectors.dense(0.0, 1.1, 0.1)),

(0.0, Vectors.dense(2.0, 1.0, -1.0)),

(0.0, Vectors.dense(2.0, 1.3, 1.0)),

(1.0, Vectors.dense(0.0, 1.2, -0.5))

)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.

//创建一个逻辑回归事例,这个实例是评估器

val lr = new LogisticRegression()

// Print out the parameters, documentation, and any default values.

//输出参数等默认值

println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.

//使用setter方法设置参数

lr.setMaxIter(10)

.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.

//使用存储在lr中的参数来,学习一个模型,

val model1 = lr.fit(training)

// Since model1 is a Model (i.e., a Transformer produced by an Estimator),

// we can view the parameters it used during fit().

// This prints the parameter (name: value) pairs, where names are unique IDs for this

// LogisticRegression instance.

//由于model1是一个模型,(也就是,一个评估器产生一个转换器),

// 我们可以看lr在fit()上使用的参数。

//输出这些参数对,参数里的names是逻辑回归实例的唯一id

println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,

// which supports several methods for specifying parameters.

//我们可以使用paramMap选择指定的参数,并且提供了很多方法来设置参数

val paramMap = ParamMap(lr.maxIter -> 20)

.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. 指定一个参数。

.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. 指定多个参数

// One can also combine ParamMaps.

val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name 改变输出列的名称

val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.

// paramMapCombined overrides all parameters set earlier via lr.set* methods.

//使用新的参数学习模型。

val model2 = lr.fit(training, paramMapCombined)

println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.

//准备测试数据

val test = sqlContext.createDataFrame(Seq(

(1.0, Vectors.dense(-1.0, 1.5, 1.3)),

(0.0, Vectors.dense(3.0, 2.0, -0.1)),

(1.0, Vectors.dense(0.0, 2.2, -1.5))

)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.

// LogisticRegression.transform will only use the 'features' column.

// Note that model2.transform() outputs a 'myProbability' column instead of the usual

// 'probability' column since we renamed the lr.probabilityCol parameter previously.

//使用转换器的transform()方法在测试数据上作出预测.

// 逻辑回归的transform方法只使用“特征”列.

// 注意model2.transform()方法输出的是myProbability列而不是probability列,因为在上面重命名了lr.probabilityCol 参数。

model2.transform(test)

.select("features", "label", "myProbability", "prediction")

.collect()

.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>

println(s"($features, $label) -> prob=$prob, prediction=$prediction")

}

个人认为: 在这个训练过程中,最终要的步骤就是设置参数,来让算法工作的更好。yes

Example: Pipeline

这个例子是上面图片中展示的文本文档管道

import org.apache.spark.ml.{Pipeline, PipelineModel}

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

import org.apache.spark.mllib.linalg.Vector

import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.

//准备训练文档,(id,内容,标签)

val training = sqlContext.createDataFrame(Seq(

(0L, "a b c d e spark", 1.0),

(1L, "b d", 0.0),

(2L, "spark f g h", 1.0),

(3L, "hadoop mapreduce", 0.0)

)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

//配置ML管道,由三个stage组成,tokenizer, hashingTF, and lr ,

val tokenizer = new Tokenizer()

.setInputCol("text")

.setOutputCol("words")

val hashingTF = new HashingTF()

.setNumFeatures(1000)

.setInputCol(tokenizer.getOutputCol)

.setOutputCol("features")

val lr = new LogisticRegression()

.setMaxIter(10)

.setRegParam(0.01)

val pipeline = new Pipeline()

.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.

//安装管道到数据上

val model = pipeline.fit(training)

// now we can optionally save the fitted pipeline to disk

//现在可以保存安装好的管道到磁盘上

model.save("/tmp/spark-logistic-regression-model")

// we can also save this unfit pipeline to disk

//也可以保存未安装的管道到磁盘上

pipeline.save("/tmp/unfit-lr-model")

// and load it back in during production

//加载管道

val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.

//准备测试文档,不包含标签

val test = sqlContext.createDataFrame(Seq(

(4L, "spark i j k"),

(5L, "l m n"),

(6L, "mapreduce spark"),

(7L, "apache hadoop")

)).toDF("id", "text")

// Make predictions on test documents.

//在测试文档上做出预测

model.transform(test)

.select("id", "text", "probability", "prediction")

.collect()

.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

println(s"($id, $text) --> prob=$prob, prediction=$prediction")

}

Example: model selection via cross-validation 通过交叉验证选择模型

在机器学习中一个重要的任务是模型选择,或使用数据发现最好的模型或给任务设置参数,这叫做调优。通过调优整个管道去促进管道选择模型会变的容易, 而不是分开的调优管道内的每一个元素。

当前,spark.ml支持使用交叉验证器CrossValidator类选择模型,这个类接收一个Estimator,一个参数集,一个Evaluator,CrossValidator 开始拆分数据集到一个fold集中,这个fold集被用来作为分开测试和训练的数据集; 例:带有3个fold的CrossValidator 将产生3组(训练,测试)数据集,每一个数据集中2/3作为训练数据,1/3作为测试数据. CrossValidator 通过参数集进行迭代计算。为每一个ParamMap,训练给定的Estimator 并且使用给予的Evaluator来评估。

RegressionEvaluator评估器Evaluator来评估回归问题,BinaryClassificationEvaluator 来评估二元数据,MultiClassClassificationEvaluator 评估多元分类问题。

用于选择最佳paraMap参数的默认度量可以被Evaluator 的setMetric方法覆盖。

产生最好评估度量的paramMap被选择作为最好的模型。CrossValidator 最终使用最好的paramMap和整个数据集fit 评估器,(意思就是执行评估器的fit方法)

下面的例子就是CrossValidator 从一个网格参数做选择。只用ParamGridBuilder 工具构造参数网格。

注意在一个网格参数上做交叉校验是非常昂贵的。例,下面的例子中,hashingTF.numFeatures有3个值和lr.regParam有2个值的参数网络,并且CrossValidator 的fold是2个。这个相乘的输出是 (3×2)×2=12 不同的明细需要训练,在真实的设置中,参数会被设置的更大并且有更多的fold(一般是 3或者10)。换句话说。使用CorssValidator是非常昂贵的。

然而,用来选择参数它也是一个行之有效的方法。

import org.apache.spark.ml.Pipeline

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

import org.apache.spark.mllib.linalg.Vector

import org.apache.spark.sql.Row

// Prepare training data from a list of (id, text, label) tuples.

//准备训练数据,id 内容,标签

val training = sqlContext.createDataFrame(Seq(

(0L, "a b c d e spark", 1.0),

(1L, "b d", 0.0),

(2L, "spark f g h", 1.0),

(3L, "hadoop mapreduce", 0.0),

(4L, "b spark who", 1.0),

(5L, "g d a y", 0.0),

(6L, "spark fly", 1.0),

(7L, "was mapreduce", 0.0),

(8L, "e spark program", 1.0),

(9L, "a e c l", 0.0),

(10L, "spark compile", 1.0),

(11L, "hadoop software", 0.0)

)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

// 配置机器学习管道,由tokenizer, hashingTF, lr评估器 组成

val tokenizer = new Tokenizer()

.setInputCol("text")

.setOutputCol("words")

val hashingTF = new HashingTF()

.setInputCol(tokenizer.getOutputCol)

.setOutputCol("features")

val lr = new LogisticRegression()

.setMaxIter(10)

val pipeline = new Pipeline()

.setStages(Array(tokenizer, hashingTF, lr))

// We use a ParamGridBuilder to construct a grid of parameters to search over.

// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,

// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

//使用ParamGridBuilder 构造一个参数网格,

//hashingTF.numFeatures有3个值,lr.regParam有2个值,

// 这个网格有6个参数给CrossValidator来选择

val paramGrid = new ParamGridBuilder()

.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))

.addGrid(lr.regParam, Array(0.1, 0.01))

.build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.

// This will allow us to jointly choose parameters for all Pipeline stages.

// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric

// is areaUnderROC.

//现在我们把管道看做成一个Estimator,把它包装到CrossValidator实例中。

//这可以让我们连带的为管道的所有stage选择参数。

//CrossValidator需要一个Estimator,一个评估器参数集合,和一个Evaluator。

//注意这里的evaluator 是二元分类的BinaryClassificationEvaluator,它默认的度量是areaUnderROC.

val cv = new CrossValidator()

.setEstimator(pipeline)

.setEvaluator(new BinaryClassificationEvaluator)

.setEstimatorParamMaps(paramGrid)

.setNumFolds(2) // Use 3+ in practice // 在实战中使用3+

// Run cross-validation, and choose the best set of parameters.

//运行交叉校验,选择最好的参数集

val cvModel = cv.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples.

//准备测试数据

val test = sqlContext.createDataFrame(Seq(

(4L, "spark i j k"),

(5L, "l m n"),

(6L, "mapreduce spark"),

(7L, "apache hadoop")

)).toDF("id", "text")

// Make predictions on test documents. cvModel uses the best model found (lrModel).

//在测试文档上做预测,cvModel是选择出来的最好的模型

cvModel.transform(test)

.select("id", "text", "probability", "prediction")

.collect()

.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

println(s"($id, $text) --> prob=$prob, prediction=$prediction")

}

个人理解: 使用交叉检验,自动的从一个参数集中选择出最好的一个, 来构建组好的模型。

Example: model selection via train validation split 例:通过训练校验分离来模型选择

除了CrossValidator 以外spark还提供TrainValidationSplit 来进行超-参数调优。 TrainValidationSplit 只评估每一种参数组合一次。而不是像CrossValidator评估k次,TrainValidationSplit 只有一次。因此不是很昂贵,但是如果训练数据集不够大就不能产生能信赖的结果。

TrainValidationSplit 需要传入一个Estimator,一个包含estimatorParamMaps 参数的paraMap的集和一个Evaluator。它一开始使用trainRatio 参数值把数据集分成训练数据和测试数据两个部分。例如: 使用trainRatio=0.75 (默认值),TrainValidationSplit 就产生75%数据用于训练,25%的数据用于测试。与CrossValidator相似的是,TrainValidationSplit 也是通过迭代参数集paramMap。对于每一种参数组合,使用给定的Estimator 训练,在给定 Evaluator上评估。产生最好的评估度量的paramMap作为最好的选择。TrainValidationSplit 最终会使用最好的参数和整个数据集条用Estimator的fit方法。

import org.apache.spark.ml.evaluation.RegressionEvaluator

import org.apache.spark.ml.regression.LinearRegression

import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// Prepare training and test data.

//准备训练数据和测试数据

val data = sqlContext.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

val lr = new LinearRegression()

// We use a ParamGridBuilder to construct a grid of parameters to search over.

// TrainValidationSplit will try all combinations of values and determine best model using

// the evaluator.

//ParamGridBuilder构建一组参数

//TrainValidationSplit将尝试从这些所有值的组合中使用evaluator选出最好的模型

val paramGrid = new ParamGridBuilder()

.addGrid(lr.regParam, Array(0.1, 0.01))

.addGrid(lr.fitIntercept)

.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))

.build()

// In this case the estimator is simply the linear regression.

// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

//在这里estimator是简单的线性回归

//TrainValidationSplit 需要一个Estimator , 一个Estimator ParamMaps集,一个Evaluator

val trainValidationSplit = new TrainValidationSplit()

.setEstimator(lr)

.setEvaluator(new RegressionEvaluator)

.setEstimatorParamMaps(paramGrid)

// 80% of the data will be used for training and the remaining 20% for validation.

//80%数据作为训练,剩下的20%作为验证

.setTrainRatio(0.8)

// Run train validation split, and choose the best set of parameters.

//运行训练校验分离,选择最好的参数。

val model = trainValidationSplit.fit(training)

// Make predictions on test data. model is the model with combination of parameters

// that performed best.

//在测试数据上做预测,模型是参数组合中执行最好的一个

model.transform(test)

.select("features", "label", "prediction")

.show()

标签: #spark ml 算法