文章目录
第8章 构建Spark ML 分类模型
在上一章中,我们通过实例介绍了Spark中基于协同过滤的推荐模型,了解了推荐模型的原理以及场景、使用流水线组装任务,使用自定义函数优化模型等。这一章我们将就Spark中分类模型为例,进一步说明如何使用Spark ML中特征选取、特征转换、流水线、模型选择或优化等方法,简化、规范化、流程化整个机器学习过程。
分类、回归和聚类是机器学习中重要的几个分支,也是日常数据处理与分析中最常用的手段。这几类的算法有着较高的成熟度,原理也较容易理解,且有着不错的效果,深受数据分析师们的喜爱。本章以Spark ML分类模型为例,主要包括以下内容:
简介用于分类的几种常用算法
加载数据
探索加载后的数据
预处理数据
把各种任务组装到流水线上
模型调优
8.1分类模型简介
8.1.1线性模型
8.1.2 决策树模型
决策树模型是一个强大的非概率模型,可以用来表示复杂的非线性模式和特征的相互关系。
8.1.3 朴素贝叶斯模型
关于朴素贝叶斯详细的原理,在维基百科中有更为详细的数学公式解释:http://en.wikipedia.org/wiki/Naive_Bayes_classifier。
8.2数据加载
存放路径在 /home/hadoop/data/train.tsv。
数据集下载
先使用shell命令对数据进行试探性的查看,并做一些简单的数据处理。
1) 查看前2行数据
1 2 3 4 |
$ head -2 train.tsv "url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label" "http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html" "4042" "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose Cali "8" .............. "0.152941176" "0.079129575" "0" |
数据集中的第1行为标题(字段名)行,下面是一些的字段说明。
2) 查看文件记录总数
1 2 |
$ cat train.tsv |wc -l 7396 |
结果显示共有:数据集一共有7396条数据
3) 由于textFile目前不好过滤标题行数据,为便于spark操作数据,需要先删除标题。
1 |
$ sed 1d train.tsv >train_noheader.tsv |
4) 将数据文件上传到 hdfs
1 |
$ hdfs dfs -put train_noheader.tsv /data |
5) 查看是否上成功
1 2 3 4 |
hadoop@master:~/data$ hdfs dfs -ls /data 17/05/24 00:46:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items -rw-r--r-- 1 hadoop supergroup 21972457 2017-05-24 00:46 /data/train_noheader.tsv |
6) 启动Spark Shell
1 |
spark-shell --master spark://master:7077 --driver-memory 2G --total-executor-cores 2 |
7) 通过sc对象的textFile方法,由本地文件数据创建RDD
1 2 |
scala> val rawData=sc.textFile("hdfs://master:9000/data/train_noheader.tsv") rawData: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/data/train_noheader.tsv MapPartitionsRDD[1] at textFile at <console>:24 |
8.3数据探索
1) 查看数据前2行
1 2 |
scala> rawData.take(2) res0: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html" "4042" "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees in... |
由上面可以看到,得到的是只有一行字符串数组。通过常看源文件,我们可以发现字段间由制表符(\t)分割。由于后续的算法我们不需要时间戳以及网页的内容,所以这里先将其过滤掉。下面我们获取每个属性。
2) 根据以上分析,对数据进行处理,并生成新的RDD
1 2 |
scala> val records = rawData.map(line => line.split("\t")) records: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26 |
3) 查看数据结构
1 2 |
scala> records.first res4: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ... |
4) 查看总的数据行数
1 2 |
scala> records.count res5: Long = 7395 |
5) 查看每一行数据的列数
1 2 |
scala> records.first.size res6: Int = 27 |
6) 获取第一行的某个值
1 2 |
scala> records.first.take(2) res22: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042") |
8.4数据预处理
1) 导入LabeledPoint
1 |
scala> import org.apache.spark.ml.feature.LabeledPoint |
2) 导入Vectors矢量方法
1 |
scala> import org.apache.spark.ml.linalg.Vectors |
3) 对数据进行1-4步的数据清洗工作
1 2 3 4 5 6 |
val data = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) LabeledPoint(label, Vectors.dense(features)) } |
上述代码可通过复制粘贴到代码行中,使用 :paste ,粘贴过后按下 Ctrl+D 即可。
4) 考虑到使用朴素贝叶斯算法时,数据需不小于0,故需要做些处理。
1 2 3 4 5 6 |
val nbData = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d) LabeledPoint(label, Vectors.dense(features)) } |
5) 查看清理后数据集的前2行数据
1 2 |
scala> data.take(2) res0: Array[org.apache.spark.ml.feature.LabeledPoint] = Array((0.0,[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]), (1.0,[0.574147,3.677966102,0.50802139,0.288770053,0.213903743,0.144385027,0.468648998,0.0,0.0,0.098707403,0.0,0.203489628,0.088652482,1.0,1.0,40.0,0.0,4973.0,187.0,9.0,0.181818182,0.125448029])) |
6) 通过RDD创建DataFrame
1 2 3 4 5 |
scala> val df = spark.createDataFrame(data) df: org.apache.spark.sql.DataFrame = [label: double, features: vector] scala> val nbDF = spark.createDataFrame(nbData) nbDF: org.apache.spark.sql.DataFrame = [label: double, features: vector] |
7) 查看df和nbDF的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> df.show(10) // 查看df的前10行数据 +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.789131,2.05555...| | 1.0|[0.574147,3.67796...| | 1.0|[0.996526,2.38288...| | 1.0|[0.801248,1.54310...| | 0.0|[0.719157,2.67647...| | 0.0|[0.0,119.0,0.7454...| | 1.0|[0.22111,0.773809...| | 0.0|[0.0,1.883333333,...| | 1.0|[0.0,0.471502591,...| | 1.0|[0.0,2.41011236,0...| +-----+--------------------+ only showing top 10 rows // 查看nbDF的第一行数据,或者使用nbDF.first也是一样的 scala> nbDF.head res21: org.apache.spark.sql.Row = [0.0,[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]] |
8) 查看df和nbDF的Schema的信息和数据总行数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scala> df.printSchema root |-- label: double (nullable = true) |-- features: vector (nullable = true) scala> df.count res4: Long = 7395 scala> nbDF.printSchema root |-- label: double (nullable = true) |-- features: vector (nullable = true) scala> nbDF.count res24: Long = 7395 |
9) 随机地将数据进行划分,80%用于训练集,20%用于测试集
1 2 3 4 5 6 7 |
scala> val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2), seed = 1234L) trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector] testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector] scala> val Array(nbTrainingData, nbTestData) = nbDF.randomSplit(Array(0.8, 0.2), seed = 1234L) nbTrainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector] nbTestData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector] |
10) 查看训练数据和测试数据的总行数
1 2 3 4 5 |
scala> trainingData.count res8: Long = 5912 scala> testData.count res9: Long = 1483 |
11) 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。
1 2 3 4 5 6 7 8 9 10 11 |
scala> trainingData.cache res10: trainingData.type = MapPartitionsRDD[24] at randomSplit at <console>:35 scala> testData.cache res11: testData.type = MapPartitionsRDD[25] at randomSplit at <console>:35 scala> nbTrainingData.cache res27: nbTrainingData.type = [label: double, features: vector] scala> nbTestData.cache res28: nbTestData.type = [label: double, features: vector] |
12) 导入逻辑回归分类器、决策树模型以及朴素贝叶斯模型
1 2 3 4 5 |
scala> import org.apache.spark.ml.classification.{LogisticRegression,LogisticRegressionModel} scala> import org.apache.spark.ml.classification.{NaiveBayes,NaiveBayesModel} scala> import org.apache.spark.ml.classification.{DecisionTreeClassifier,DecisionTreeClassificationModel} |
13) 创建贝叶斯模型,设置初始参数
1 2 |
scala> val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features") nbModel: org.apache.spark.ml.classification.NaiveBayes = nb_050f7aa0718e |
14) 通过朴素贝叶斯训练模型,对测试数据进行预测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//训练数据 scala> val nbModel = nb.fit(nbTrainingData) nbModel: org.apache.spark.ml.classification.NaiveBayesModel = NaiveBayesModel (uid=nb_63013179fe1f) with 2 classes // 预测数据 scala> val nbPrediction = nbModel.transform(nbTestData) nbPrediction: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields] scala> nbPrediction.show(10) +-----+--------------------+--------------------+--------------------+----------+ |label| features| rawPrediction| probability|prediction| +-----+--------------------+--------------------+--------------------+----------+ | 0.0|[0.0,0.0,0.0,0.0,...|[-46.748642409367...|[0.83980023102260...| 0.0| | 0.0|[0.0,0.0,0.0,0.0,...|[-28.455469606463...|[0.80560314910869...| 0.0| | 0.0|[0.0,0.0,0.0,0.0,...|[-21.419660085849...|[0.74277974559336...| 0.0| | 0.0|[0.0,0.253731343,...|[-566.66641956697...|[1.0,1.7019208090...| 0.0| | 0.0|[0.0,0.5,0.0,0.0,...|[-85.270246662200...|[0.83409619579026...| 0.0| | 0.0|[0.0,0.5,0.0,0.0,...|[-109.88609079237...|[0.94215720717655...| 0.0| | 0.0|[0.0,0.563636364,...|[-645.84504343631...|[3.97514834896843...| 1.0| | 0.0|[0.0,0.590163934,...|[-2040.0838024687...|[0.99999724227148...| 0.0| | 0.0|[0.0,0.677966102,...|[-432.36145227604...|[0.99888109472905...| 0.0| | 0.0|[0.0,0.7,0.111111...|[-222.48044531991...|[0.99999999305131...| 0.0| +-----+--------------------+--------------------+--------------------+----------+ only showing top 10 rows |
15) 朴素贝叶斯准确性统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
//t1 存放预测值的数组,t2存放测试数据标签值 // t3存放测试数据总行数 scala>val (t1, t2, t3) = (nbPrediction.select("prediction").collect, nbTestData.select("label").collect,nbTestData.count.toInt) // t4 为累加器 scala> var t4 = 0 t4: Int = 0 // 遍历循环,统计正确预测的次数 scala> for(i <- 0 to t3-1){if(t1(i)==t2(i)) t4+=1} // 查看预测正确的个数 scala> t4 res63: Int = 840 // 计算准确率 scala> val nbAccuracy = 1.0*t4/t3 nbAccuracy: Double = 0.5664194200944033 |
可以看到,朴素贝叶斯的准确率为56.6419%。
8.5组装
1) 导入特征索引类
1 |
scala>import org.apache.spark.ml.feature.{ VectorIndexer, VectorIndexerModel} |
2) 建立特征索引
1 2 |
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df) featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_b73ca1435eea |
3) 创建逻辑回归模型
1 2 |
scala> val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.001) lrModel: org.apache.spark.ml.classification.LogisticRegression = logreg_9bec21f2262f |
4) 创建决策树模型
1 2 |
scala> val dt = new DecisionTreeClassifier().setLabelCol("label").setFeaturesCol("indexedFeatures").setImpurity("entropy").setMaxBins(100).setMaxDepth(5).setMinInfoGain(0.01) dtModel: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_8a3a01185f6b |
5) 导入网格参数和交叉验证
1 |
scala> import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator } |
6) 导入流水线
1 |
import org.apache.spark.ml.{Pipeline,PipelineModel} |
7) 导入评估器
1 |
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator |
8) 配置2个流水线:一个是逻辑回归的流水线,包含2个stages( featureIndexer和lr);
一个是决策树回归的流水线,包含2个stages( featureIndexer 和 dt)。
1 2 3 4 |
scala> val lrPipeline = new Pipeline().setStages(Array(featureIndexer,lr)) lrPipeline: org.apache.spark.ml.Pipeline = pipeline_64c542dff42e scala> val dtPipeline = new Pipeline().setStages(Array(featureIndexer,dt)) dtPipeline: org.apache.spark.ml.Pipeline = pipeline_b9ed2ccc2108 |
8.6模型优化
1) 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid
1 2 3 4 5 6 7 8 9 10 |
scala> :paste val lrParamGrid = new ParamGridBuilder() .addGrid(lr.regParam,Array(0.1,0.3,0.5)) .addGrid(lr.maxIter, Array(10,20,30)) .build() scala> :paste val dtParamGrid = new ParamGridBuilder() .addGrid(dt.maxDepth, Array(3,5,7)) .build() |
2) 分别实例化交叉验证模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
val evaluator = new BinaryClassificationEvaluator() scala> :paste val lrCV = new CrossValidator() .setEstimator(lrPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(lrParamGrid) .setNumFolds(2) lrCV: org.apache.spark.ml.tuning.CrossValidator = cv_b25c7e0f1be7 scala> :paste val dtCV = new CrossValidator() .setEstimator(dtPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(dtParamGrid) .setNumFolds(2) dtCV: org.apache.spark.ml.tuning.CrossValidator = cv_5176e642601d |
3) 通过交叉验证模型,获取最优参数集,并测试模型
1 2 3 4 5 6 7 8 9 10 |
scala>val lrCvModel = lrCV.fit(trainingData) lrCvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_b25c7e0f1be7 scala> val dtCvModel = dtCV.fit(trainingData) dtCvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_5176e642601d scala> val lrPrediction = lrCvModel.transform(testData) lrPrediction: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 4 more fields] scala> val dtPrediction = dtCvModel.transform(testData) dtPrediction: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 4 more fields] |
4) 查看数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
scala> lrPrediction.select("label","prediction").show(10) +-----+----------+ |label|prediction| +-----+----------+ | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 1.0| | 0.0| 1.0| | 0.0| 1.0| | 0.0| 1.0| | 0.0| 0.0| | 0.0| 0.0| +-----+----------+ only showing top 10 rows scala> dtPrediction.select("label","prediction").show(10) +-----+----------+ |label|prediction| +-----+----------+ | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 1.0| | 0.0| 1.0| | 0.0| 1.0| | 0.0| 0.0| +-----+----------+ only showing top 10 rows |
5) 查看逻辑回归匹配模型的参数
1 2 3 4 5 6 7 8 9 10 |
scala> val lrBestModel = lrCvModel.bestModel.asInstanceOf[PipelineModel] lrBestModel: org.apache.spark.ml.PipelineModel = pipeline_64c542dff42e scala> val lrModel = lrBestModel.stages(1).asInstanceOf[LogisticRegressionModel] lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_100994a23a48 scala> lrModel.getRegParam res20: Double = 0.1 scala> lrModel.getMaxIter res21: Int = 20 |
6) 查看决策树匹配模型的参数
1 2 3 4 5 6 7 8 9 10 11 |
scala> val dtBestModel = dtCvModel.bestModel.asInstanceOf[PipelineModel] dtBestModel: org.apache.spark.ml.PipelineModel = pipeline_b9ed2ccc2108 scala> val dtModel = dtBestModel.stages(1).asInstanceOf[DecisionTreeClassificationModel] dtModel: org.apache.spark.ml.classification.DecisionTreeClassificationModel = DecisionTreeClassificationModel (uid=dtc_b10fd9474309) of depth 4 with 17 nodes scala> dtModel.getMaxDepth res24: Int = 3 scala> dtModel.numFeatures res25: Int = 22 |
7) 统计逻辑回归的预测正确率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// t_lr 为逻辑回归预测值的数组,t_dt 为决策树预测值的数组 // t_label 为测试集的标签值的数组 scala> val (t_lr, t_dt, t_label, t_count) = (lrPrediction.select("prediction").collect, dtPrediction.select("prediction").collect,testData.select("label").collect,testData.count.toInt) // c_lr 为统计逻辑回归预测正确个数的累加器 //c_dt 为统计决策树预测正确个数的累加器 scala> var Array(c_lr,c_dt) = Array(0,0) t4: Int = 0 // 遍历循环,统计逻辑回归正确预测的次数 scala> for(i <- 0 to t_count-1){if(t_lr(i)==t_label(i)) c_lr+=1} scala> c_lr res5: Int = 899 # 统计逻辑回归正确率 scala> 1.0*c_lr/t_count res6: Double = 0.6062036412677007 //遍历循环,统计逻辑回归正确预测的次数 scala> for(i <- 0 to t_count-1){if(t_dt(i)==t_label(i)) c_dt+=1} scala> c_dt res8: Int = 927 // 统计决策树正确率 scala> 1.0*c_dt / t_count res9: Double = 0.6250842886041807 |
可以看到,我们通过交叉验证得出最优参数,从而获得最佳模型,将这个过程使用流水线连接起来,方便了我们的工作。关于模型的优化,其实我们还有很多工作要做,第11章也也出了一定的优化思路和方法。
8.7小结
本章就Spark ML中分类模型进行的详细介绍,包括逻辑回归、决策树、朴素贝叶斯模型的原理,同时介绍了分类模型的一些使用场景。通过流水线、网格参数以及交叉验证的方式,将整个机器学习过程规范化、标准化、流程化。
Pingback引用通告: 深度实践Spark机器学习 – 飞谷云人工智能