0%

SparkML学习笔记

Spark有两个机器学习包,分别是基于RDD的spark.mliib以及基于DataFrame的spark.mllib,从Spark 2.0开始spark.mllib开始进入维护模式,目前仍然支持RDD的API但之后不会添加新的特性,但是在Spark3.0的时候RDD的API将会被移除。为了达到与RDD的API平等的地位,在Spark 2.x中DataFrame的API会一直添加新的特性,在Spark2.3的时候大致实现了该目标。

基本统计量

/**
* 1. dense向量是普通的double数组,而sparse向量是由两个并列的数组indices和values组成
* 2. 计算Person相关系数的变量必须要满足连续性,接近单峰分布以及每对观测指相互独立
* 3. 卡方检验要满足特征和标签是类别变量
*/

object BasicStatistics {
def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder().appName("BasicStatistics").master("local").getOrCreate()
spark.sparkContext.setLogLevel("Error")
import spark.implicits._

val data: Seq[linalg.Vector] = Seq(
Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
Vectors.dense(4.0, 5.0, 0.0, 3.0),
Vectors.dense(6.0, 7.0, 0.0, 8.0),
Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))
)

// 两种相关系数的计算方法
val df: DataFrame = data.map(Tuple1.apply).toDF("features")
val Row(coeff1: Matrix): Row = Correlation.corr(df, "features").head
println(s"Pearson correlation matrix:\n $coeff1, ${coeff1.getClass}")

val Row(coeff2: Matrix): Row = Correlation.corr(df, "features", "spearman").head
println(s"Spearman correlation matrix:\n $coeff2")

// Pearson’s Chi-squared ( χ2) tests for independence
val data2: Seq[(Double, linalg.Vector)] = Seq(
(0.0, Vectors.dense(0.5, 10.0)),
(0.0, Vectors.dense(1.5, 20.0)),
(1.0, Vectors.dense(1.5, 30.0)),
(0.0, Vectors.dense(3.5, 30.0)),
(0.0, Vectors.dense(3.5, 40.0)),
(1.0, Vectors.dense(3.5, 40.0))
)

val df2: DataFrame = data2.toDF("label", "features")
val chi: Row = ChiSquareTest.test(df2, "features", "label").head

println(s"pValues = ${chi.getAs[Vector](0)}")
println(s"degreesOfFreedom ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
println(s"statistics ${chi.getAs[Vector](2)}")

}
}

基本组件

/**
* 1. Transformer
* Transformer是抽象类,它包含feature transformer和learned model
* 1)feature transformer的原理是将读入的列map成新列feature vectors,然后将其追加到DataFrame
* 2) learning model的原理是读入feature vectors列并预测标签,然后将其追加到DataFrame
* 2. Estimator
* Estimator是可以拟合数据的各类算法的抽象类,它实现了.fit(),即接收DataFrame返回Model,这里的Model是transformer
* 3. Parameters
* 1) 有两种方法将参数传入算法
* 法1. 针对算法的实例设定参数,比如针对LogisticRegression的实例lr设定参数lr.setMaxIter(10),使得lr.fit()的时候迭代了10次
* 法2. 将ParamMap传入fit()或者transform(). ParamMap中的任意参数都会覆盖原本设定的参数
* 2) 参数只属于Estimators以及Transformers的实例,比如有两个LogisticRegression实例lr1和lr2,那么可以将参数构建成ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20).
*/

object TransEstAndParam {
def main(args: Array[String]): Unit ={

val spark: SparkSession = SparkSession.builder().appName("TransFormerAndEstimator").master("local").getOrCreate()
spark.sparkContext.setLogLevel("Error")

// 由tuple构成的Seq来作为训练集
val training: DataFrame = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// 创建一个LogisticRegression实例(Estimator)
val lr: LogisticRegression = new LogisticRegression()

// 通过explainParams打印参数文档
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// 通过传递参数的法1来配置参数
lr.setMaxIter(10)
.setRegParam(0.01)

// 使用之前设定的参数来学习模型(Transformer)
val lr_clf: LogisticRegressionModel = lr.fit(training)

// 通过.parent.extractParamMap查看fit期间的参数,打印的参数是name,value,其中names就是LogisticRegression实例的ID
println(s"Model 1 was fit using parameters: ${lr_clf.parent.extractParamMap}")

// 通过传递参数的法2来配置参数,可以指定一个参数,也可以同时指定多个参数,其中重复配置的参数会覆盖以前的参数
val paramMap: ParamMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30)
.put(lr.regParam -> 0.1, lr.threshold -> 0.55)

// 修改输出概率列的列名(ParamMaps可以相互结合)
val paramMap2: ParamMap = ParamMap(lr.probabilityCol -> "myProbability")
val paramMapCombined: ParamMap = paramMap ++ paramMap2

// 将paramMapCombined传入fit中
val model2: LogisticRegressionModel = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")

// 构建测试集
val test: DataFrame = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// 预测的时候仅仅使用'features'列
val res: Array[Row] = model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()

res.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
}
}

Pipeline

/**
* 1. Pipeline是由一系列的PipelineStages构成,其中每个stage要么是Transformer,要么是Estimator
* 2. 输入DataFrame进入Pipeline后按照顺序被传给每个stage,在transformer stage中,transform()方法被调用在DataFrame,
* 在Estimator的stage中,fit()方法被调用产生transformer,并称之为PipelineModel
* 3. Pipeline是一个Estimator,在执行fit()之后产生PipelineModel并用在预测环节,PipelineModel和初始的Pipeline有相同数量的stage,
* 但初始的Pipeline中的所有Estimators在PipelineModel中都被转成了Transformer
*/
object Pipeline {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("Pipeline").master("local").getOrCreate()

// 用id,text以及label的tuple构成的Sequence作为训练documents
val training: DataFrame = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// 构建由tokenizer, hashingTF, and lr构成的pipeline,
val tokenizer: Tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")

// setBinary:
// true(伯努利计算,默认), 统计某词在一篇文章中只要出现了就为1,否则为0
// false(多项式分布计算),一个词在一篇文章中出现多少次,计算多少次
// setNumFeatures: 设计词表的大小
// HashingTF将文档的每行转换成(词表大小, 词的id<单个字符时同ascii码一样>, 词频)形式,其中词频指的是当前行(文章的词频)
val hashingTF: HashingTF = new HashingTF()
.setBinary(true)
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")

val lr: LogisticRegression = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)

val pipeline: Pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

val model: PipelineModel = pipeline.fit(training)

// 将拟合的模型保存(Transformer)
model.write.overwrite().save("/Users/bytedance/ByteCode/SparkLab/data/spark-logistic-regression-model")

// 将未拟合的模型保存(Estimator)
pipeline.write.overwrite().save("/Users/bytedance/ByteCode/SparkLab/data/unfit-lr-model")

// 加载保存的模型
val sameModel: PipelineModel = PipelineModel.load("/Users/bytedance/ByteCode/SparkLab/data/spark-logistic-regression-model")

// 用id, text但是没有label的tuple构成的Sequence作为测试documents
val test: DataFrame = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")

val res: Array[Row] = sameModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()

res.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
}
}