龙空技术网

关于Spark NLP学习,你需要掌握的 LightPipeline(附代码&链接)

数据派THU 552

前言:

现时同学们对“spark ml 算法”大约比较讲究,咱们都需要知道一些“spark ml 算法”的相关知识。那么小编同时在网摘上搜集了一些关于“spark ml 算法””的相关文章,希望小伙伴们能喜欢,朋友们快快来学习一下吧!

作者:Veysel Kocaman, Data Scientist & ML Researcher ANKIT CHOUDHARY

翻译:赵春光

校对:申利彬

本文约2800字,建议阅读9分钟

本文介绍Spark NLP中是如何使用Annotator和Transformer的。

[ 导读 ]Pipeline具体来说是一个多阶段的序列,每个阶段由一个Transformer或者Estimator组成。各个阶段按顺序执行,并将输入的DataFrame转换和传递给下一个阶段。数据如此按序的在pipeline中传递。我们现在再来看看Spark NLP是如何使用Annotator和Transformer的。

内容

本文是Spark NLP Library中各annotator系列中的第2篇文章。如果你想更多的学习Spark NLP及对应的概念,请先阅读下述文章:

Introduction to Spark NLP: Foundations and Basic Components (Part-I)

本文主要是作为上篇文章的延续。

在机器学习中,常见的一种做法是运行一系列的算法来处理和学习数据。这种算法序列常被称作做Pipeline。

Pipeline具体来说是一个多阶段的序列,每个阶段由一个Transformer或者Estimator组成。各个阶段按顺序执行,并将输入的DataFrame转换和传递给下一个阶段,数据如此按序的在pipeline中传递。每个阶段的transform()方法函数更新这组数据集并传递到下一阶段。因为有了pipeline,训练数据和测试数据会通过确保一致的特征处理环节。

每个使用的annotator 会在pipeline中的这个data frame新添一列

我们现在来看一下Spark NLP是如果使用Annotator和Transformer完成上述过程。假如我们需要将如下几个环节逐一施加在data frame上:

文本分离成语句分词正交化得到词嵌入向量

下面是通过Spark NLP实现这个pipeline的代码:

from pyspark.ml import Pipelinedocument_assembler = DocumentAssembler()\ .setInputCol(“text”)\ .setOutputCol(“document”)sentenceDetector = SentenceDetector()\ .setInputCols([“document”])\ .setOutputCol(“sentences”)tokenizer = Tokenizer() \ .setInputCols([“sentences”]) \ .setOutputCol(“token”)normalizer = Normalizer()\ .setInputCols([“token”])\ .setOutputCol(“normal”)word_embeddings=WordEmbeddingsModel.pretrained()\ .setInputCols([“document”,”normal”])\ .setOutputCol(“embeddings”)nlpPipeline = Pipeline(stages=[ document_assembler,  sentenceDetector, tokenizer, normalizer, word_embeddings, ])pipelineModel = nlpPipeline.fit(df)

接下来我们加载了一组数据到这个pipeline中,看一下模型如何工作。

Dataframe样本(5452行)

然后运行上述pipeline,我们会得到一个训练好的模型。之后我们用它转换整个DataFrame。

result = pipelineModel.transform(df)result.show()

转换前20行数据用了501毫秒;转换整个data frame共用了11秒。

%%timeresult = pipelineModel.transform(df).collect()>>>CPU times: user 2.01 s, sys: 425 ms, total: 2.43 sWall time: 11 s

如果我们想把这个pipeline保存到硬盘,然后调用它转换一行文字,在线转换时间会多长呢?

from pyspark.sql import Rowtext = "How did serfdom develop in and then leave Russia ?"line_df = spark.createDataFrame(list(map(lambda x: Row(text=x), [text])), ["text"])%time result = pipelineModel.transform(line_df).collect()>>>CPU times: user 31.1 ms, sys: 7.73 ms, total: 38.9 msWall time: 515 ms

转换一行短文字的时间也是515毫秒!几乎是和之前转换20行的时间一致。所以说,效果太好。实际上,类似的情况也发生在使用分布式处理小数据的时候。分布式处理和云计算主要是用来处理大数据,而使用Spark来处理小型数据其实是杀鸡用牛刀。

实际上,由于它内部的机制和优化后的构架,Spark仍适用于中等大小单机可处理的数据。但不建议使用Spark来处理仅仅是几行的数据, 除非使用Spark NLP。

打个比方,Spark 好像一个火车和一个自行车赛跑。自行车会在轻载的时候占上风,因为它更敏捷、提速更快,而重载的火车可能需要一段时间提速,但最终还是会速度更快。

所以,如果我们想要预测的时间更快该怎么办呢?使用LightPipeline。

LightPipeline

LightPipelines 是Spark NLP对应的Pipeline, 等同于Spark ML Pipeline, 但是用于处理更小的数据。它们适用于小数据集、调试结果,或者是对一次性服务API请求的训练或预测。

Spark NLP LightPipelines 是将Spark ML Pipelines 转换成了一个单机但多线程的任务,对于小型数据(不大于5万个句子)速度会提升10倍。

这些Pipeline的使用方法是插入已训练(已拟合)的模型,然后会标注纯文本。我们都不需要把输入文字转换成Dataframe就可以输入pipeline,虽然pipeline当初是使用Dataframe作为输入。这个便捷的功能适用于使用已训练的模型对少数几行文字进行预测。

from sparknlp.base import LightPipelineLightPipeline(someTrainedPipeline).annotate(someStringOrArray)

下面是一些LightPipelines可用的方法函数。我们还可以用字符列表作为输入文字。

我们可以很方便的创建LightPipelines,也不需要处理Spark Datasets。LightPipelines运行的也很快,而且在驱动节点工作时可执行并行运算。下面是一个应用的例子:

from sparknlp.base import LightPipelinelightModel = LightPipeline(pipelineModel, parse_embeddings=True)%time lightModel.annotate("How did serfdom develop in and then leave Russia ?")>>>CPU times: user 12.4 ms, sys: 3.81 ms, total: 16.3 msWall time: 28.3 ms{'sentences': ['How did serfdom develop in and then leave Russia ?'], 'document': ['How did serfdom develop in and then leave Russia ?'], 'normal': ['How',  'did',  'serfdom',  'develop',  'in',  'and',  'then',  'leave',  'Russia'], 'token': ['How',  'did',  'serfdom',  'develop',  'in',  'and',  'then',  'leave',  'Russia',  '?'], 'embeddings': ['-0.23769 0.59392 0.58697 -0.041788 -0.86803 -0.0051122 -0.4493 -0.027985, ...]}

这个代码用了28毫秒!几乎是使用Spark ML Pipeline时的20倍速度。

上面可以看出,annotate只返回了result的属性。既然这个嵌入向量数组储存在embedding属性的WordEmbeddingModel标注器下,我们可以设置parse_embedding = True 来分析嵌入向量数据。否则,我们可能在输出中只能获得嵌入向量的分词属性。关于上述属性的更多信息见以下连接:

如果我们想获取标注的全部信息,我们还可以使用fullAnnotate()来返回整个标注内容的字典列表。

result = lightModel.fullAnnotate("How did serfdom develop in and then leave Russia ?")>>>[{'sentences': [<sparknlp.base.Annotation at 0x139d685c0>],  'document': [<sparknlp.base.Annotation at 0x149b5a320>],  'normal': [<sparknlp.base.Annotation at 0x139d9e940>,   <sparknlp.base.Annotation at 0x139d64860>,   <sparknlp.base.Annotation at 0x139d689b0>,   <sparknlp.base.Annotation at 0x139dd16d8>,   <sparknlp.base.Annotation at 0x139dd1c88>,   <sparknlp.base.Annotation at 0x139d681d0>,   <sparknlp.base.Annotation at 0x139d89128>,   <sparknlp.base.Annotation at 0x139da44a8>,   <sparknlp.base.Annotation at 0x139da4f98>],  'token': [<sparknlp.base.Annotation at 0x149b55400>,   <sparknlp.base.Annotation at 0x139dd1668>,   <sparknlp.base.Annotation at 0x139dad358>,   <sparknlp.base.Annotation at 0x139d8dba8>,   <sparknlp.base.Annotation at 0x139d89710>,   <sparknlp.base.Annotation at 0x139da4208>,   <sparknlp.base.Annotation at 0x139db2f98>,   <sparknlp.base.Annotation at 0x139da4240>,   <sparknlp.base.Annotation at 0x149b55470>,   <sparknlp.base.Annotation at 0x139dad198>],  'embeddings': [<sparknlp.base.Annotation at 0x139dad208>,   <sparknlp.base.Annotation at 0x139d89898>,   <sparknlp.base.Annotation at 0x139db2860>,   <sparknlp.base.Annotation at 0x139dbbf28>,   <sparknlp.base.Annotation at 0x139dbb3c8>,   <sparknlp.base.Annotation at 0x139db2208>,   <sparknlp.base.Annotation at 0x139da4668>,   <sparknlp.base.Annotation at 0x139dd1ba8>,   <sparknlp.base.Annotation at 0x139d9e400>]}]

fullAnnotate()返回标注类型中的内容和元数据。根据参考文档,这个标定类型有如下属性:

参考文档:

annotatorType: String,begin: Int,end: Int,result: String, (this is what annotate returns)metadata: Map[String, String],embeddings: Array[Float]

所以,下面的代码可以返回一个句子的起始或者结束:

result[0]['sentences'][0].begin>> 0result[0]['sentences'][0].end>> 49result[0]['sentences'][0].result>> 'How did serfdom develop in and then leave Russia ?'

嵌入向量每个分词的的元数据也可以得到:

result[0]['embeddings'][2].metadata>> {'isOOV': 'false', 'pieceId': '-1', 'isWordStart': 'true', 'token': 'serfdom', 'sentence': '0'}

不过我们还没能从LightPipeline得到non-Spark NLP标注器的信息。例如当需要在pipeline中同时使用Spark ML 的功能(如work2vec)和Spark NLP时, LightPipeline只返回Spark NLP annotations 的结果,但不会有没有任何Spark ML models的域输出。所以可以说LightPipeline不会返回Spark NLP标注器以外的任何结果,至少当前如此。

我们计划近期给Spark NLP写一个wrapper,用于兼容的 Spark ML 的所有ML模型。此后大家就可以使用LightPipeline来完成机器学习的案例,来在Spark NLP中训练模型,然后部署实现更快的在线预测。

结语

Spark NLP LightPipelines 是把 Spark ML pipelines转换成了一个单机但多线程的任务,在少量的数据上速度提升可达到10倍。本文讨论了如何将Spark Pipelines转换成Spark NLP Light Pipelines,以便在小数据上获得更快的响应。这也是Spark NLP的最酷的特征之一。我们可以享受Spark强大的数据处理和训练功能,然而在单机运行时使用Light Pipelines来获得更快的预测速度。

希望大家已经渡过上一篇关于official Medium page的文章了,并开始用到Spark NLP。下面是一些相关文章的连接,不要忘记关注我们的主页!

Introduction to Spark NLP: Foundations and Basic Components (Part-I)

Introduction to: Spark NLP: Installation and Getting Started (Part-II)

-----------------------

Spark NLP 101 : Document Assembler

原文标题:

Spark NLP 101: LightPipeline

原文链接:

编辑:黄继彦

校对:林亦霖

译者简介

赵春光,硕士毕业于宾夕法尼亚州立大学,现从事汽车控制系统及软件的开发。致力于将机器学习的学术研究应该用到产业落地项目上。

— 完 —

关注清华-青岛数据科学研究院官方微信公众平台“THU数据派”及姊妹号“数据派THU”获取更多讲座福利及优质内容。

标签: #spark ml 算法