本章数据集下载
第5章 模型选择和优化
本章主要介绍如何使用Spark ML提供的方法及自定义函数等方法来对模型进行调优。我们可以通过Spark ML内建的交叉验证、训练验证拆方法、网格参数等方法进行模型调优,当然也可以自定义函数进行模型优化。
本章主要内容包括:
模型选择
交叉验证
训练验证拆分法
自定义函数调优
5.1 模型选择
调优可以是对单个的Estimator,比如LogisticRegression,或者对包含多个算法、特征化和其他步骤的整个Pipline。用户可以一次性对整个Pipline进行调优,而不必对Pipline中的每一个元素进行单独的调优。
MLlib支持使用像交叉验证(CrossValidator)和训练验证拆分法(TrainValidationSplit)这样的工具进行模型选择(Model selection)。这些工具需要以下的组件:
Estimator:用户调优的算法或Pipline
ParamMap集合:提供参数选择,有时也叫作用户查找的“参数网格”
Evaluator:衡量模型在测试数据上的拟合程度
在上层,这些模型选择工具的工作方式如下:
将输入数据切分成训练数据集和测试数据集
对于每一个(训练数据,测试数据)对,通过ParamMap集合进行迭代:对于每个ParamMap,使用它提供的参数对Estimator进行拟合,给出拟合模型,然后使用Evaluator来评估模型的性能。
选择表现最好的参数集合生成的模型。
5.2交叉验证
下例使CrossValidator从整个网格的参数中选择合适的参数,而从自动选择最优模型。
在整个参数网格中进行交叉验证是比较耗时的。例如,在下面的例子中,参数网格有3个hashingTF.numFeatures值和2个lr.regParam值,CrossValidator使用2折切分数据。最终将有(3 * 2) * 2 = 12个不同的模型将被训练。在真实场景中,很可能使用更多的参数和进行更多折切分(k=3和k=10都很常见)。使用CrossValidator的代价可能会异常的高,当大数据集比较大时,需要慎重选择。不过采用交叉验证法,对比手动调优,还是有较大优势。
下面通过示例说明如何使用CrossValidator从整个网格的参数中选择合适的参数。
导入必要的包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import org.apache.spark.ml.classification.{LogisticRegression,LogisticRegressionModel} import org.apache.spark.ml.{Pipeline,PipelineModel} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.sql.Row 生成一个含id、text、label的训练数据集 val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label") |
配置一个流水线,该流水线包含3个stages: tokenizer, hashingTF, and lr。
1 2 3 4 5 6 7 8 9 10 |
val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) |
使用ParamGridBuilder构造一个parameter grid
1 2 3 4 |
val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build() |
流水线,嵌入到CrossValidator实例中,这样流水线的任务都可使用网格参数。
CrossValidator一般需要一个Estimator, 参数集及一个评估器Evaluator。
BinaryClassificationEvaluator缺省的评估指标为AUC(areaUnderROC)。
1 2 3 4 5 |
val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) |
通过交叉验证模型, 并获取最优参数集,并测试模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val cvModel = cv.fit(training) val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") val Predictions=cvModel.transform(test) Predictions.select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") } |
查看最佳模型中各参数值
1 2 3 4 5 |
val bestModel= cvModel.bestModel.asInstanceOf[PipelineModel] val lrModel = bestModel.stages(2).asInstanceOf[LogisticRegressionModel] lrModel.getRegParam //显示为:0.1 lrModel.numFeatures //显示结果为:100 |
5.3训练验证拆分法
像CrossValidator一样,TrainValidationSplit最终适合使用最好的ParamMap
和整个数据集的Estimator。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit} //准备训练数据和测试数据 val data = spark.read.format("libsvm").load("file:///u01/bigdata/spark/data/mllib/sample_linear_regression_data.txt") val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345) val lr = new LinearRegression() .setMaxIter(10) //我们使用ParamGridBuilder构建一个搜索参数网格 // TrainValidationSplit将尝试所有值的组合,并确定使用最佳模型。 val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build() //在这种情况下,估计量就是线性回归。 // TrainValidationSplit需要估计器,一组估计器 参数映射和一个评估器。 val trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) .setTrainRatio(0.8) // 80%的数据将用于培训,其余20%用于验证。 //使用验证拆分训练,并选择最佳的一组参数。 val model = trainValidationSplit.fit(training) //对测试数据进行预测。 模型具有最佳性能的参数组合。 model.transform(test) .select("features", "label", "prediction") .show() |
5.4自定义模型选择
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import org.apache.spark.sql.DataFrame import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.{ALS,ALSModel} case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long) def parseRating(str: String): Rating = { val fields = str.split("::") assert(fields.size == 4) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) } val ratings = spark.read.textFile("file:///u01/bigdata/spark/data/mllib/als/sample_movielens_ratings.txt") .map(parseRating) .toDF() //将样本评分表分成3个部分,分别用于训练 (60%), 校验 (20%), and 测试 (20%) val splits = ratings.randomSplit(Array(0.6, 0.2,0.2),12) //把训练样本缓存起来,加快运算速度 val training = splits(0).cache() val validation = splits(1).toDF.cache() val test = splits(1).toDF.cache() //计算各集合总数 val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() //训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型 val ranks = List(20, 40) val lambdas = List(0.01, 0.1) val numIters = List(5, 10) var bestModel: Option[ALSModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = 1.0 var bestNumIter = 1 def computeRmse(model:ALSModel,data:DataFrame,n:Long):Double = { val predictions = model.transform(data) val p1=predictions.rdd.map{ x =>((x(0),x(1)),x(2))}.join(predictions.rdd.map{ x =>((x(0),x(1)),x(4))}).values math.sqrt(p1.map( x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_+_)/n) } for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val als = new ALS() .setMaxIter(numIter) .setRegParam(lambda) .setRank(rank) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val model = als.fit(training) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE(validation) = " + validationRmse + " for the model trained with rank = " + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } RMSE(validation) = 1.544091979633677 for the model trained with rank = 20,lambda = 0.01,and numIter = 5. RMSE(validation) = 1.3973269424767092 for the model trained with rank = 20,lambda = 0.01,and numIter = 10. RMSE(validation) = 1.0381748242379452 for the model trained with rank = 20,lambda = 0.1,and numIter = 5. RMSE(validation) = 1.0059139723438042 for the model trained with rank = 20,lambda = 0.1,and numIter = 10. RMSE(validation) = 1.5545572907673506 for the model trained with rank = 40,lambda = 0.01,and numIter = 5. RMSE(validation) = 1.390163394916565 for the model trained with rank = 40,lambda = 0.01,and numIter = 10. RMSE(validation) = 1.0520138790334888 for the model trained with rank = 40,lambda = 0.1,and numIter = 5. RMSE(validation) = 1.0132571682721812 for the model trained with rank = 40,lambda = 0.1,and numIter = 10. //用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差(RMSE) val testRmse = computeRmse(bestModel.get, test, numTest) testRmse: Double = 1.0059139723438042 println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") //最佳模型相关参数 |
the best model was trained with rank = 20 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 1.0059139723438042.
5.5小结
本章主要介绍了几种模型选择或调优的方法,我们可以从训练的数据集入手,可以从模型参数入手,当然也可把两者结合起来。实际上模型的优化还有很多其他方法,如使用不同的算法、集成算法等等。下一章我们将介绍Spark MLlib一些基础知识,包括Spark MLlib架构、原理、算法及算法依赖的一些库、向量和矩阵等相关内容。
Pingback引用通告: 深度实践Spark机器学习 – 飞谷云人工智能