文章目录
本章数据集下载
第6章Spark MLlib基础
传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用。一旦数据量过大,往往需要采用数据抽样的方法。但这种抽样很难保证不走样。近些年随着 HDFS 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习变得可能或必要,但由于MapReduce计算框架,虽然实现分布式计算,但中间结果需要存在到磁盘,这对这计算过程中需要多次迭代的机器学习,因为通常情况下机器学习算法参数学习的过程都是迭代计算的,不很理想。
Spark的出现,正好弥补了MapReduce的不足,它立足于内存计算,所以特别适合机器学习的迭代式计算。同时Spark提供了一个基于海量数据的分布式运算的机器学习库,同时提供了很多特征选取、特征转换等内嵌函数,大大降低了大家学习和使用Spark的门槛,对很多开发者只需对 Spark 有一定基础、了解机器学习算法的基本原理、以及相关参数的含义和作用,一般都可以通过都可以比较顺利地使用Spark进行基于大数据的机器学习。
Spark在机器学习方面有很多优势,本章主要Spark与机器学习相关的内容。
Spark MLlib简介
Spark MLlib架构
常用的几种数据类型
基础统计
RDD、DataFrame及Dataset间的异同
Spark MLlib常用算法
6.1Spark MLlib简介
MLlib是MLBase一部分,其中MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。它们的结构如下图:
图6-2 MLBase四部分关系
6.2Spark MLlib架构
6.3数据类型
Spark MLlib的数据类型主要分为四种,下面将分别介绍。
1. 本地向量(Local vector)
其创建方式主要有以下几种:(以下使用Scala语言)
1 2 3 4 5 6 7 8 9 10 11 |
spark-shell --master spark://master:7077--total-executor-cores 2 //创建一个稠密向量 scala>import org.apache.spark.mllib.linalg.{Vector, Vectors} scala> val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0] //创建一个稀疏向量,说明非零项的索引和值。 scala> val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) sv1: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0]) //创建一个稀疏向量,采用序列方式说明 scala> val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0]) |
2. 标记点(Labeled point)
标记点是由一个本地向量(密集或稀疏)和一个标签(整数或浮点)组成,这个值的具体内容可以由用户指定。
1 2 3 4 5 6 7 8 |
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // 用稠密向量创建标记点 scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0]) // 用稀疏向量创建标记点 scala> val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) neg: org.apache.spark.mllib.regression.LabeledPoint = (0.0,(3,[0,2],[1.0,3.0])) |
从文件中直接获取标记点:
1 2 3 4 5 |
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "file:///home/hadoop/bigdata/spark/data/mllib/sample_libsvm_data.txt") |
3. 本地矩阵(Local matrix)
由行索引、列索引、类型值组成,存放在单机中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) scala> val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) dm: org.apache.spark.mllib.linalg.Matrix = 1.0 2.0 3.0 4.0 5.0 6.0 // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) scala> val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8)) sm: org.apache.spark.mllib.linalg.Matrix = 3 x 2 CSCMatrix (0,0) 9.0 (2,1) 6.0 (1,1) 8.0 |
4. 分布式矩阵(Distributed matrix)
1 2 3 4 5 6 7 8 9 |
scala>import org.apache.spark.mllib.linalg.Vector scala>import org.apache.spark.mllib.linalg.distributed.RowMatrix scala>val rows: RDD[Vector] = ... // 确定参数* //从RDD[Vector]创建分布式矩阵 scala>val mat: RowMatrix = new RowMatrix(rows) // 获取矩阵行列数 scala>val m = mat.numRows() scala>val n = mat.numCols() |
6.4 基础统计
6.4.1摘要统计
示例代码如下:
1 2 3 4 5 6 7 8 9 10 |
import org.apache.spark.mllib.linalg.{Vector,Vectors} import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} val data=sc.parallelize(1 to 100,2) val data1=data.map(x=>Vectors.dense(x)) val stat1=Statistics.colStats(data1) //可求stat1的如下统计值: //count max mean min normL1 normL2 numNonzeros variance //如count stat1.count //显示为100 stat1.normL1 //显示为5050.0 |
6.4.2相关性
目前Spark支持两种相关性(correlations)系数:皮尔森相关系数(pearson)和斯皮尔曼等级相关系数(spearman)。下面通过示例说明相关系统的如何计算。
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statistics val x=sc.parallelize(Array(1.0,2.0,3.0,3.2,2.6)) val y=sc.parallelize(Array(4.0,5.0,6.0,4.2,5.6)) val corr_r=Statistics.corr(x, y, "pearson") val corr_p=Statistics.corr(x, y, "spearman") |
6.4.3假设检验
假设检验(Hypothesis testing),Spark MLlib目前支持皮尔森卡方检测(Pearson’s chi-squared tests),包括适配度检测和独立性检测。适配度检测要求输入为Vector, 独立性检验要求输入是Matrix。
代码示例:
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.mllib.linalg.{Matrix, Matrices, Vectors} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.{SparkConf, SparkContext} val vd = Vectors.dense(1, 2, 3, 4, 5) val vdResult = Statistics.chiSqTest(vd) val mtx = Matrices.dense(3, 2, Array(1, 3, 5, 2, 4, 6)) val mtxResult = Statistics.chiSqTest(mtx) |
6.4.4随机数据生成
代码示例:
1 2 3 4 5 6 7 8 |
import org.apache.spark.SparkContext import org.apache.spark.mllib.random.RandomRDDs._ // 生成100个服从正态分布N(0,1)的RDD〔Double〕,并且分布在 10 个分区中 val u = normalRDD(sc, 100L, 10) // 把生成的随机数转换为 N(1,4) 正态分布。 val v = u.map(x => 1.0 + 2.0 * x) |
6.5 RDD、Dataframe和Dataset
目前,spark.mllib包中基于RDD的APIs已进入维护模式,以后将以spark.ml包中的基于DataFrame的API为主。
6.5.1RDD
RDD是Spark建立之初的核心API。它是一种有容错机制的特殊集合, RDD是不可变分布式弹性数据集,在Spark集群中可跨节点分区,以函数式编程操作集合的方式,进行各种并行操作,提供分布式low-level API来操作,包括transformation和action等。
6.5.2Dataset/DataFrame
DataFrame与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame多了数据的结构信息,即schema,类似于传统数据库中的表。
6.5.3相互转换
RDD、DataFrame和Dataset间可以互相转换。
6.6小结
本章主要介绍了Spark MLlib的一些内容,包括MLlib的生态、架构等内容,同时介绍了Spark MLlib算法底层依赖的基础内容,如数据类型、基础统计等,最后简单介绍了RDD、DataFrame与Dataset间的异同等。后续章节我们将通过一些实例,说明如何把前几章介绍的一些方法应用到具体实例中。
Pingback引用通告: 深度实践Spark机器学习 – 飞谷云人工智能