本章数据集下载
第7章 构建Spark ML推荐模型
前面我们介绍了机器学习的一般步骤、如何探索数据、如何预处理数据、如何利用Spark Ml中的一些算法或API,以及有效处理机器学习过程中的特征转换、特征选择、训练模型,并把这些过程流程化等。从本章开始,我们将通过实例,进一步阐述这些问题,并通过实例把相关内容有机结合起来。
本章主要介绍Spark机器学习中的协同过滤(Collaborative Filtering,CF)模型,协调过滤简单来,说是利用某个兴趣相投、拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的,进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。在日常生活中,人们实际上经常使用这种方法,如你哪天突然想看个电影,但你不知道具体看哪部,你会怎么做?大部分的人会问问周围的朋友,最近有什么好看的电影,而我们一般更倾向于从兴趣或观点相近的朋友那里得到推荐。这就是协同过滤的思想。换句话说,就是借鉴和你相关人群的观点来进行推荐。
本章介绍Spark的推荐模型,将按以下步骤进行:
首先简介推荐模型
加载数据到HDFS
Spark读取数据
对数据进行探索
训练模型
组装任务
评估、优化模型
7.1推荐模型简介
协同过滤常被用于推荐系统。这类技术目标在于填充“用户-商品”联系矩阵中的缺失项。Spark.ml目前支持基于模型的协同过滤,其中用户和商品以少量的潜在因子来描述,用以预测缺失项。Spark.ml使用交替最小二乘(ALS)算法来学习这些潜在因子。
7.2数据加载
这里使用MovieLens 100k数据集,主要包括用户属性数据(u.user)、电影数据(u.item)、用户对电影的评级数据(u.data)及题材数据(u.genre)等。在把数据复制到HDFS之前,我们先大致了解一下相关数据:
用户数据(u.user)结构:
1 2 3 4 5 6 |
$ head -3 u.user 1|24|M|technician|85711 2|53|F|other|94043 3|23|M|writer|32067 $ wc -l u.user 943 u.user |
可以看出用户数据由user id、age、gender、occupation和zip code等5个字段,字段间隔符为竖线("|"),共有943行。
电影数据(u.item)结构:
1 2 3 4 5 6 |
head -3 u.item 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0 $ wc -l u.item 1682 u.item |
可以看出用户数据由movie id、title、release date及其他属性,字段间隔符为竖线("|"),共有1682行。
用户对电影评级数据(u.data)结构:
1 2 3 4 5 6 |
$ head -3 u.data 196 242 3 881250949 186 302 3 891717742 22 377 1 878887116 $ wc -l u.data 100000 u.data |
可以看出用户数据由user id、movie id、rating(1-5)和timestamp等4个字段,字段间隔符为制表符("\t"),共有100000行。
电影题材数据(u.genre):
1 2 3 4 5 6 |
$ head -3 u.genre unknown|0 Action|1 Adventure|2 $wc -l u.genre 20 u.genre |
这个数据只有两个字段:题材及代码,以竖线分隔。共有20种电影题材。
把用户数据(u.user)复制到HDFS上,其他数据方法一样。
1 |
$ hadoop fs -put u.user /home/hadoop/data/ |
查看数据复制是否成功
1 |
$ hadoop fs -ls /home/hadoop/data/ |
把相关数据复制到HDFS后,我们就可以利用Pyspark对数据进行探索或简单分析,这里使用Pyspark主要考虑其可视化功能,如果不需要数据的可视化,使用Spark即可。
以spark Standalone模式启动spark集群
1 |
spark-shell --master spark://master:7077 --driver-memory 1G --total-executor-cores 2 |
导入需要的包或库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.SparkSession import org.apache.spark.ml.Pipeline 因读入数据缺省都是字符格式,故需要对数据进行格式转换。 //定义个类,来保存一次评分 case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long) //把一行转换成一个评分类 def parseRating(str: String): Rating = { val fields = str.split("\t") assert(fields.size == 4) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) } 读取数据,并缓存数据,因后面需要多次使用这份数据。 val ratings = spark.read.textFile("hdfs://master:9000/home/hadoop/data/u.data") .map(parseRating) .cache() |
7.3数据探索
数据加载到HDFS后,我们便可对数据进行探索和分析,对用户数据的探索,大家可参考2.4.3节的相关内容。用户对电影评级数据比较简单,这里我们简单查看一下导入数据抽样及统计信息。 抽样数据:
1 2 3 4 5 6 7 8 9 |
ratings.show(4) +------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 196| 242| 3.0|881250949| | 186| 302| 3.0|891717742| | 22| 377| 1.0|878887116| | 244| 51| 2.0|880606923| +------+-------+------+---------+ |
用户ID、电影ID、评级数据统计信息:
1 2 3 4 5 6 7 8 9 10 |
ratings.describe("userId","movieId","rating").show() +-------+------------------+------------------+------------------+ |summary| userId| movieId| rating| +-------+------------------+------------------+------------------+ | count| 100000| 100000| 100000| | mean| 462.48475| 425.53013| 3.52986| | stddev|266.61442012750905|330.79835632558473|1.1256735991443214| | min| 1| 1| 1.0| | max| 943| 1682| 5.0| +-------+------------------+------------------+------------------+ |
由此可知,该数据集共有100000条,评级最低为1.0,最高为5.0,平均3.5左右。
7.4训练模型
这里数据比较简单,无须做数据转换和清理等数据预处理工作。在训练模型前,我们需要把数据划分为几个部分,这里先随机划分成两部分,划分比例为80%作为训练集,20%作为测试集。后续我们在性能优化时将采用另一种划分方式,然后,比较使用不同划分方法对模型性能或泛化能力的影响。
1 2 3 4 5 6 7 8 9 10 |
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2),seed=1234) val als = new ALS() .setMaxIter(10) .setRank(10) .setRegParam(0.01) .setNonnegative(true) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") |
7.5组装
1)创建流水线,把数据转换、模型训练等任务组装在一条流水线上。
1 |
val pipeline = new Pipeline().setStages(Array(als)) |
2)训练模型
1 |
val model = pipeline.fit(training) |
3)作出预测
1 |
val predictions = model.transform(test) |
4)查看预测值与原来的值
1 2 3 4 5 6 7 8 9 10 |
predictions.show(5) +------+-------+------+---------+----------+ |userId|movieId|rating|timestamp|prediction| +------+-------+------+---------+----------+ | 222| 148| 2.0|881061164| 3.1357265| | 330| 148| 4.0|876544781| 3.9583592| | 224| 148| 3.0|888104154| 3.9787998| | 618| 148| 3.0|891309670| 2.7060091| | 896| 148| 2.0|887160606| 2.8391676| +------+-------+------+---------+----------+ |
7.6评估模型
1)预测时会产生NaN,即NaN表示不推荐(预测时产生NaN是spark2.1 ALS中的一个bug,该bug在2.2中将修复)
1 |
predictions.filter(predictions("prediction").isNaN).select("userId","movieId","rating","prediction").count() |
2)删除含NaN的值的行,NaN有一定合理性,不推荐,但为评估指标,可以先过滤这些数。
1 2 3 4 5 6 7 |
val predictions1= predictions.na.drop() val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredictionCol("prediction") val rmse = evaluator.evaluate(predictions1) |
3)运行结果为:rmse: Double = 1.016902715345917
7.7模型优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
//导入一些包 import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import org.apache.spark.sql.DataFrame import org.apache.spark.ml.recommendation.{ALS,ALSModel} //将样本评分表分成3个部分,分别用于训练 (60%), 校验 (20%), and 测试 (20%) val splits = ratings.randomSplit(Array(0.6, 0.2,0.2),12) //把训练样本缓存起来,加快运算速度 val training = splits(0).cache() val validation = splits(1).toDF.cache() val test = splits(1).toDF.cache() //计算各集合总数 val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() //训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型 val ranks = List(10, 20) val lambdas = List(0.01, 0.1) val numIters = List(5, 10) var bestModel: Option[ALSModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = 1.0 var bestNumIter = 1 def computeRmse(model:ALSModel,data:DataFrame,n:Long):Double = { val predictions = model.transform(data) val p1=predictions.na.drop().rdd.map{ x =>((x(0),x(1)),x(2))}.join(predictions.rdd.map{ x =>((x(0),x(1)),x(4))}).values math.sqrt(p1.map( x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_+_)/n) } for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val als = new ALS() .setMaxIter(numIter) .setRegParam(lambda) .setRank(rank) .setNonnegative(true) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val model = als.fit(training) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE(validation) = " + validationRmse + " for the model trained with rank = " + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } //运行结果 RMSE(validation) = 1.0664516122491705 for the model trained with rank = 10,lambda = 0.01,and numIter = 5. RMSE(validation) = 1.0773258157269512 for the model trained with rank = 10,lambda = 0.01,and numIter = 10. RMSE(validation) = 0.9509095542582248 for the model trained with rank = 10,lambda = 0.1,and numIter = 5. RMSE(validation) = 0.9390664107785451 for the model trained with rank = 10,lambda = 0.1,and numIter = 10. RMSE(validation) = 1.1024492428290906 for the model trained with rank = 20,lambda = 0.01,and numIter = 5. RMSE(validation) = 1.1242105743040174 for the model trained with rank = 20,lambda = 0.01,and numIter = 10. RMSE(validation) = 0.9393089637028184 for the model trained with rank = 20,lambda = 0.1,and numIter = 5. RMSE(validation) = 0.9383240505365207 for the model trained with rank = 20,lambda = 0.1,and numIter = 10. //用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差(RMSE) val testRmse = computeRmse(bestModel.get, test, numTest) testRmse: Double = 0.9383240505365207 //比优化前的rmse: Double = 1.016902715345917提高了7.6%左右。 //打印最优模型中的各参数值 println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") |
//最佳模型相关参数
The best model was trained with rank = 20 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.9383240505365207.
7.8小结
本章介绍了推荐模型的一般方法,Spark推荐模型的原理和算法等,然后通过一个实例具体说明实施Spark推荐模型的一般步骤、使用自定义函数优化模型等内容。下一章将以Spark ML的分类模型为例,进一步说明如何使用Spark ML提供的特征选取、特征转换、流水线、交叉验证等函数或方法。
Pingback引用通告: 深度实践Spark机器学习 – 飞谷云人工智能