Spark-mllib特征转换算法

Spark-mllib特征转换算法

Tokenization(分词器)

算法介绍:

Tokenization将文本划分为独立个体(通常为单词)。

RegexTokenizer基于正则表达式提供更多的划分选项。默认情况下,参数“pattern”为划分文本的分隔符。或者可以指定参数“gaps”来指明正则“patten”表示“tokens”而不是分隔符,这样来为分词结果找到所有可能匹配的情况。

调用:

Scala:

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)


StopWordsRemover(去除停用词)

算法介绍:

停用词为在文档中频繁出现,但未承载太多意义的词语,他们不应该被包含在算法输入中。

StopWordsRemover的输入为一系列字符串(如分词器输出),输出中删除了所有停用词。停用词表由stopWords参数提供。一些语言的默认停用词表可以通过StopWordsRemover.loadDefaultStopWords(language)调用。布尔参数caseSensitive指明是否区分大小写(默认为否)。

示例:

假设我们有如下DataFrame,有id和raw两列:

id | raw

----|----------

0 | [I, saw, the, red, baloon]

1 | [Mary, had, a, little, lamb]

通过对raw列调用StopWordsRemover,我们可以得到筛选出的结果列如下:

id | raw |filtered

----|-----------------------------|--------------------

0 | [I, saw, the, red, baloon] | [saw, red, baloon]

1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]

其中,“I”,“the”, “had”以及“a”被移除。

调用:

Scala:

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "baloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show()


n-gram

算法介绍:

一个n-gram是一个长度为整数n的字序列。NGram可以用来将输入转换为n-gram。

NGram的输入为一系列字符串(如分词器输出)。参数n决定每个n-gram包含的对象个数。结果包含一系列n-gram,其中每个n-gram代表一个空格分割的n个连续字符。如果输入少于n个字符串,将没有输出结果。

调用:

Scala:

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("label", "words")

val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)

Binarizer

算法介绍:

二值化是根据阀值将连续数值特征转换为0-1特征的过程。

Binarizer参数有输入、输出以及阀值。特征值大于阀值将映射为1.0,特征值小于等于阀值将映射为0.0。

调用:

Scala:

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("label", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeatures = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)


PCA(主成分分析)

算法介绍:

主成分分析是一种统计学方法,它使用正交转换从一系列可能相关的变量中提取线性无关变量集,提取出的变量集中的元素称为主成分。使用PCA
方法可以对变量集合进行降维。下面的示例介绍如何将5维特征向量转换为3维主成分向量。

调用:

Scala:

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()


PolynomialExpansion

算法介绍:

多项式扩展通过产生n维组合将原始特征将特征扩展到多项式空间。下面的示例会介绍如何将你的特征集拓展到3维多项式空间。

调用:

Scala:

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(-2.0, 2.3),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(0.6, -1.1)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)

DiscreteCosine Transform (DCT)

算法介绍:

离散余弦变换是与傅里叶变换相关的一种变换,它类似于离散傅立叶变换但是只使用实数。离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶函数进行的(因为一个实偶函数的傅里叶变换仍然是一个实偶函数)。离散余弦变换,经常被信号处理和图像处理使用,用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩。

调用:

Scala:

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(3)


StringIndexer

算法介绍:

StringIndexer将字符串标签编码为标签指标。指标取值范围为[0,numLabels],按照标签出现频率排序,所以出现最频繁的标签其指标为0。如果输入列为数值型,我们先将之映射到字符串然后再对字符串的值进行指标。如果下游的管道节点需要使用字符串-指标标签,则必须将输入和钻还为字符串-指标列名。

示例:

假设我们有如下的DataFrame包含id和category两列:

id | category

----|----------

0 | a

1 | b

2 | c

3 | a

4 | a

5 | c

category是有3种取值的字符串列,使用StringIndexer进行转换后我们可以得到如下输出:

id | category | categoryIndex

----|----------|---------------

0 | a | 0.0

1 | b | 2.0

2 | c | 1.0

3 | a | 0.0

4 | a | 0.0

5 | c | 1.0

另外,如果在转换新数据时出现了在训练中未出现的标签,StringIndexer将会报错(默认值)或者跳过未出现的标签实例。

调用:

Scala:

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()


IndexToString

算法介绍:

与StringIndexer对应,StringIndexer将指标标签映射回原始字符串标签。一个常用的场景是先通过StringIndexer产生指标标签,然后使用指标标签进行训练,最后再对预测结果使用StringIndexer来获取其原始的标签字符串。

示例:

假设我们有如下的DataFrame包含id和categoryIndex两列:

id | categoryIndex

----|---------------

0 | 0.0

1 | 2.0

2 | 1.0

3 | 0.0

4 | 0.0

5 | 1.0

使用originalCategory我们可以获取其原始的标签字符串如下:

id | categoryIndex | originalCategory

----|---------------|-----------------

0 | 0.0 | a

1 | 2.0 | b

2 | 1.0 | c

3 | 0.0 | a

4 | 0.0 | a

5 | 1.0 | c

调用:

Scala:

import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()


OneHotEncoder

算法介绍:

独热编码将标签指标映射为二值向量,其中最多一个单值。这种编码被用于将种类特征使用到需要连续特征的算法,如逻辑回归等。

调用:

Scala:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()

VectorIndexer

算法介绍:

VectorIndexer解决数据集中的类别特征Vector。它可以自动识别哪些特征是类别型的,并且将原始值转换为类别指标。它的处理流程如下:

1.获得一个向量类型的输入以及maxCategories参数。

2.基于原始数值识别哪些特征需要被类别化,其中最多maxCategories需要被类别化。

3.对于每一个类别特征计算0-based类别指标。

4.对类别特征进行索引然后将原始值转换为指标。

索引后的类别特征可以帮助决策树等算法处理类别型特征,并得到较好结果。

在下面的例子中,我们读入一个数据集,然后使用VectorIndexer来决定哪些特征需要被作为非数值类型处理,将非数值型特征转换为他们的索引。

调用示例:

Scala:

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()


Normalizer(正则化

算法介绍:

Normalizer是一个转换器,它可以将多行向量输入转化为统一的形式。参数为p(默认值:2)来指定正则化中使用的p-norm。正则化操作可以使输入数据标准化并提高后期学习算法的效果。

下面的例子展示如何读入一个libsvm格式的数据,然后将每一行转换为

www.zeeklog.com  - Spark-mllib特征转换算法

以及

www.zeeklog.com  - Spark-mllib特征转换算法

形式。

调用示例:

Scala:

import org.apache.spark.ml.feature.Normalizer

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()


StandardScaler

算法介绍:

StandardScaler处理Vector数据,标准化每个特征使得其有统一的标准差以及(或者)均值为零。它需要如下参数:

1. withStd:默认值为真,使用统一标准差方式。

2. withMean:默认为假。此种方法将产出一个稠密输出,所以不适用于稀疏输入。

StandardScaler是一个Estimator,它可以fit数据集产生一个StandardScalerModel,用来计算汇总统计。然后产生的模可以用来转换向量至统一的标准差以及(或者)零均值特征。

注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0。

下面的示例展示如果读入一个libsvm形式的数据以及返回有统一标准差的标准化特征。

调用示例:

Scala:

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()


MinMaxScaler

算法介绍:

MinMaxScaler通过重新调节大小将Vector形式的列转换到指定的范围内,通常为[0,1],它的参数有:

1. min:默认为0.0,为转换后所有特征的下边界。

2. max:默认为1.0,为转换后所有特征的下边界。

MinMaxScaler计算数据集的汇总统计量,并产生一个MinMaxScalerModel。该模型可以将独立的特征的值转换到指定的范围内。

对于特征E来说,调整后的特征值如下:

www.zeeklog.com  - Spark-mllib特征转换算法


如果

www.zeeklog.com  - Spark-mllib特征转换算法

,则

www.zeeklog.com  - Spark-mllib特征转换算法

注意因为零值转换后可能变为非零值,所以即便为稀疏输入,输出也可能为稠密向量。

下面的示例展示如果读入一个libsvm形式的数据以及调整其特征值到[0,1]之间。

调用示例:

Scala:

import org.apache.spark.ml.feature.MinMaxScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()


MaxAbsScaler

算法介绍:

MaxAbsScaler使用每个特征的最大值的绝对值将输入向量的特征值转换到[-1,1]之间。因为它不会转移/集中数据,所以不会破坏数据的稀疏性。

下面的示例展示如果读入一个libsvm形式的数据以及调整其特征值到[-1,1]之间。

调用示例:

Scala:

import org.apache.spark.ml.feature.MaxAbsScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()


Bucketizer

算法介绍:

Bucketizer将一列连续的特征转换为特征区间,区间由用户指定。参数如下:

1. splits:分裂数为n+1时,将产生n个区间。除了最后一个区间外,每个区间范围[x,y]由分裂的x,y决定。分裂必须是严格递增的。在分裂指定外的值将被归为错误。两个分裂的例子为Array(Double.NegativeInfinity,0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)。

注意,当不确定分裂的上下边界时,应当添加Double.NegativeInfinity和Double.PositiveInfinity以免越界。

下面将展示Bucketizer的使用方法。

调用示例:

Scala:

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()


ElementwiseProduct

算法介绍:

ElementwiseProduct按提供的“weight”向量,返回与输入向量元素级别的乘积。即是说,按提供的权重分别对输入数据进行缩放,得到输入向量v以及权重向量w的Hadamard积。

下面例子展示如何通过转换向量的值来调整向量。

调用示例:

Scala:

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()


SQLTransformer

算法介绍:

SQLTransformer工具用来转换由SQL定义的陈述。目前仅支持SQL语法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表输入数据的基础表。选择语句指定输出中展示的字段、元素和表达式,支持Spark SQL中的所有选择语句。用户可以基于选择结果使用Spark SQL建立方程或者用户自定义函数。SQLTransformer支持语法示例如下:

1. SELECTa, a + b AS a_b FROM __THIS__

2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5

3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

示例:

假设我们有如下DataFrame包含id,v1,v2列:
id | v1 | v2

----|-----|-----

0 | 1.0 | 3.0

2 | 2.0 | 5.0

使用SQLTransformer语句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"转换后得到输出如下:

id | v1 | v2 | v3 | v4

----|-----|-----|-----|-----

0 | 1.0| 3.0 | 4.0 | 3.0

2 | 2.0| 5.0 | 7.0 |10.0

调用示例:

Scala:

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
 sqlTrans.transform(df).show()


VectorAssembler

算法介绍:

VectorAssembler是一个转换器,它将给定的若干列合并为一列向量。它可以将原始特征和一系列通过其他转换器得到的特征合并为单一的特征向量,来训练如逻辑回归和决策树等机器学习算法。VectorAssembler可接受的输入列类型:数值型、布尔型、向量型。输入列的值将按指定顺序依次添加到一个新向量中。

示例:

假设我们有如下DataFrame包含id, hour, mobile, userFeatures以及clicked列:

id | hour | mobile| userFeatures | clicked

----|------|--------|------------------|---------

0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0

userFeatures列中含有3个用户特征。我们想将hour, mobile以及userFeatures合并为一个新列。将VectorAssembler的输入指定为hour, mobile以及userFeatures,输出指定为features,通过转换我们将得到以下结果:

id | hour | mobile| userFeatures | clicked | features

----|------|--------|------------------|---------|-----------------------------

0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]

调用示例:

Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())


QuantileDiscretizer

算法介绍:

QuantileDiscretizer讲连续型特征转换为分级类别特征。分级的数量由numBuckets参数决定。分级的范围有渐进算法决定。渐进的精度由relativeError参数决定。当relativeError设置为0时,将会计算精确的分位点(计算代价较高)。分级的上下边界为负无穷到正无穷,覆盖所有的实数值。

示例:
假设我们有如下DataFrame包含id, hour:

id | hour

----|------

0 |18.0

----|------

1 |19.0

----|------

2 | 8.0

----|------

3 | 5.0

----|------

4 | 2.2

hour是一个Double类型的连续特征,将参数numBuckets设置为3,我们可以将hour转换为如下分级特征。

id | hour | result

----|------|------

0 |18.0 | 2.0

----|------|------

1 |19.0 | 2.0

----|------|------

2 |8.0 | 1.0

----|------|------

3 |5.0 | 1.0

----|------|------

4 |2.2 | 0.0

调用示例:

Scala:

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()