第11章 PySpark 决策树模型
图11-1 PySpark架构图
PySpark的Python解释器在启动时会同时启动一个JVM,Python解释器与JVM进程通过socket进行通信,在python driver端,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext。Py4J只使用在driver端,用于本地python与Java SparkContext objects的通信。大量数据的传输使用的是另一个机制。RDD在python下的转换会被映射成java环境下PythonRDD。在远端worker机器上,PythonRDD对象启动一些子进程并通过pipes与这些子进程通信,以此send用户代码和数据。
11.1 PySpark 简介
在Spark的官网上这么介绍PySpark:“PySpark is the Python API for Spark”,也就是说PySpark其实是Spark为Python提供的编程接口。此外,Spark还提供了关于Scala、Java和R的编程接口,关于Spark为R提供的编程接口(Spark R)将在第12章进行介绍。
11.2 决策树简介
11.3.1 原数据集初探
这里的数据选择为某比赛的数据集,用来预测推荐的一些页面是短暂(昙花一现)还是长久(长时流行)。原数据集为train.tsv,存放路径在 /home/hadoop/data/train.tsv。
1) 查看前2行数据
1 2 3 4 |
$ head -2 train.tsv "url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label" "http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html" "4042" "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose Cali "8" .............. "0.152941176" "0.079129575" "0" |
1 2 |
$ cat train.tsv |wc -l 7396 |
2) 由于textFile目前不好过滤标题行数据,为便于spark操作数据,需要先删除标题。
1 |
$ sed 1d train.tsv >train_noheader.tsv |
3) 将数据文件上传到 hdfs
1 |
$ hdfs dfs -put train_noheader.tsv /data |
4) 查看是否成功
1 2 3 4 |
hadoop@master:~/data$ hdfs dfs -ls /data 17/05/24 00:46:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items -rw-r--r-- 1 hadoop supergroup 21972457 2017-05-24 00:46 /data/train_noheader.tsv |
11.3.2 PySpark 的启动
以spark Standalone模式启动spark集群,保证内存分配充足。
1 |
$ pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 4 |
[注]:使用pyspark --help 可以查看指令的详细帮助信息。
1 2 3 4 |
# Default to standard python interpreter unless told otherwise if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"ipython"}" fi |
11.3.3 基本函数
表11-4 本章使用的一些函数或方法简介
1) 通过sc对象的textFile方法,载入本地数据文件,创建RDD
1 |
In [1]: raw_data = sc.textFile("hdfs://master:9000/data/train_noheader.tsv") |
2) 查看第1行数据
1 2 3 |
In [2]: raw_data.take(2) Out[2]:[u'"http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html"\t"4042"\t"{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose Cali..."\t...\t"8"\t"0.152941176"\t"0.079129575"\t"0"', u'"http://www.popsci.com/technology/article/2012-07/electronic-futuristic-starting-gun-eliminates-advantages-races"\t"8471"\t"{""title"":""The Fully Electronic Futuristic Starting Gun That Eliminates Advantages in Races the fully electronic, futuristic starting gun that eliminates advantages in races the fully electronic, futuristic starting gun that eliminates advantages in races"",""body"":""And that can be carried on a plane without the hassle too The Omega..."\t...\t"9"\t"0.181818182"\t"0.125448029"\t"1"'] |
3) 查看数据文件的总行数
1 2 3 |
In [3]: numRaws = raw_data.count() In [4]: numRaws Out[4]: 7395 |
4) 按键进行统计
1 2 |
In [5]: raw_data.countByKey() Out[5]: defaultdict(int, {u'"': 7395}) |
1) 由于后续的算法我们不需要时间戳以及网页的内容,所以这里先将其过滤掉。
1 |
In [6]: records = raw_data.map(lambda line: line.split('\t')) |
2) 查看records 数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
In [7]: records.first() Out[7]: [u'"http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html"', u'"4042"', u'"{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar...""}"', u'"business"', u'"0.789131"', u'"2.055555556"', u'"0.676470588"', u'"0.205882353"', u'"0.047058824"', u'"0.023529412"', u'"0.443783175"', u'"0"', u'"0"', u'"0.09077381"', u'"0"', u'"0.245831182"', u'"0.003883495"', u'"1"', u'"1"', u'"24"', u'"0"', u'"5424"', u'"170"', u'"8"', u'"0.152941176"', u'"0.079129575"', u'"0"'] |
3) 查看每一行的列数
1 2 |
In [8]: len(records.first()) Out[8]: 27 |
导入Vectors 矢量方法
1 |
In [9]: from pyspark.ml.linalg import Vectors |
1 |
In [10]: from pyspark.ml.classification import DecisionTreeClassifier |
4) 将RDD中的所有元素以列表的形式返回
1 |
In [11]: data = records.collect() |
5) 查看data数据一行有多少列
1 2 3 |
In [12]: numColumns = len(data[0]) In [13]: numColumns Out[13]: 27 |
6) 定义一个列表data1,存放清理过的数据,格式为[(label_1, features_1), (label_2, features_2),…]
1 |
In [14]: data1 = [] |
1 2 3 4 5 6 7 |
In [15]: for i in range(numRaws): trimmed = [ each.replace('"', "") for each in data[i] ] label = int(trimmed[-1]) features = map(lambda x: 0.0 if x == "?" else x, trimmed[4:numColumns-1]) c = (label, Vectors.dense(map(float, features))) data1.append(c) |
1) 将data1 转换为DataFrame对象,label表示标签列,features 表示特征值列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
In [16]: df= spark.createDataFrame(data1, ["label","features"]) In [17]: df.show(10) +-----+--------------------+ |label| features| +-----+--------------------+ | 0|[0.789131,2.05555...| | 1|[0.574147,3.67796...| | 1|[0.996526,2.38288...| | 1|[0.801248,1.54310...| | 0|[0.719157,2.67647...| | 0|[0.0,119.0,0.7454...| | 1|[0.22111,0.773809...| | 0|[0.0,1.883333333,...| | 1|[0.0,0.471502591,...| | 1|[0.0,2.41011236,0...| +-----+--------------------+ only showing top 10 rows # 显示 df 的Schema In [18]: df.printSchema() root |-- label: long (nullable = true) |-- features: vector (nullable = true) |
2) 由于后面会经常使用,所以将df载入内存
1 2 |
In [19]: df.cache() Out[19]: DataFrame[label: double, features: vector] |
3) 建立特征索引
1 2 |
In [20]: from pyspark.ml.feature import VectorIndexer In [20]: featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(df) |
4) 将数据切分成80%训练集和20%测试集
1 2 3 4 5 6 7 8 |
#seed=1234L,表示每次随机生成的训练集和测试集的总行数不变 In [21]: (trainingData, testData) = df.randomSplit([0.8, 0.2],seed=1234L) In [22]: trainingData.count() Out[22]: 5912 In [23]: testData.count() Out[23]: 1483 |
5) 指定决策树模型的深度、标签列,特征值列,使用信息熵(entropy)作为评估方法,并训练数据。
1 |
In [24]: dt = DecisionTreeClassifier(maxDepth=5, labelCol="label", featuresCol="indexedFeatures", impurity="entropy") |
6) 构建流水线工作流
1 2 3 4 5 |
In [25]: from pyspark.ml import Pipeline In [26]: pipeline = Pipeline(stages=[featureIndexer, dt]) In [27]: model = pipeline.fit(trainingData) ## 训练模型 |
1) 使用第一行数据进行预测结果,看看是否相符合,这里先来看一下原数据集第一行数据
1 2 3 4 |
In [28]: data1[0] Out[28]: (0.0, DenseVector([0.7891, 2.0556, 0.6765, 0.2059, 0.0471, 0.0235, 0.4438, 0.0, 0.0, 0.0908, 0.0, 0.2458, 0.0039, 1.0, 1.0, 24.0, 0.0, 5424.0, 170.0, 8.0, 0.1529, 0.0791])) |
2) 使用数据集中第一行的特征值数据进行预测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
In [29]: test0 = spark.createDataFrame([(data1[0][1],)], ["features"]) In [30]: result = model.transform(test0) # 查看预测结果 In [31]: result.show() +--------------------+--------------------+-------------+--------------------+----------+ | features| indexedFeatures|rawPrediction| probability|prediction| +--------------------+--------------------+-------------+--------------------+----------+ |[0.789131,2.05555...|[0.789131,2.05555...|[274.0,310.0]|[0.46917808219178...| 1.0| +--------------------+--------------------+-------------+--------------------+----------+ In [32]: predictedResult.select(['prediction']).show() #只获取预测值 +----------+ |prediction| +----------+ | 1.0| +----------+ |
3) 将第一行的特征值数据修改掉2个(这里换掉第一个和第二个值),进行该特征值下的预测.
1 2 3 4 5 6 7 8 |
# 将第一行的数据进行修改 In [33]: firstRaw = list(data1[0][1]) In [34]: firstRaw[0] = 2.7891 In [35]: firstRaw[1] = 0.0556 In [36]: predictedData = Vectors.dense(firstRaw) In [37]: predictedData Out[37]: DenseVector([2.7891, 0.0556, 0.6765, 0.2059, 0.0471, 0.0235, 0.4438, 0.0, 0.0, 0.0908, 0.0, 0.2458, 0.0039, 1.0, 1.0, 24.0, 0.0, 5424.0, 170.0, 8.0, 0.1529, 0.0791]) |
4) 进行新数据的预测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
In [38]: predictedRaw = spark.createDataFrame([(predictedData,)], ["features"]) In [39]: predictedResult = model.transform(predictedRaw) In [40]: predictedResult.show() +--------------------+--------------------+-------------+--------------------+----------+ | features| indexedFeatures|rawPrediction| probability|prediction| +--------------------+--------------------+-------------+--------------------+----------+ |[2.7891,0.0556,0....|[2.7891,0.0556,0....|[274.0,310.0]|[0.46917808219178...| 1.0| +--------------------+--------------------+-------------+--------------------+----------+ In [41]: predictedResult.select(['prediction']).show() +----------+ |prediction| +----------+ | 1.0| +----------+ |
5) 下面我们用测试数据做决策树准确度测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# 通过模型,预测测试集 In [42]: predictedResultAll = model.transform(testData) #查看预测值 In [43]: predictedResultAll.select("prediction").show() +----------+ |prediction| +----------+ | 0.0| | 0.0| | 1.0| | 1.0| | 0.0| | 0.0| | 0.0| | 0.0| | 0.0| | 1.0| +----------+ only showing top 10 rows #由于预测值是DataFrame对象,每一行是Raw型,不可做修改 #需将预测值转换为pandas,然后转换为列表 In [44]:df_prediction = predictedResultAll.select("prediction").toPandas() In [45]: dtPredictions = list(df_prediction.prediction) #查看前10个预测值 In [46]: dtPredictions[:10] Out[46]: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0] #对预测值做准确性统计 In [47]: dtTotalCorrect = 0 #获取测试集的总行数 In [48]: testRaw = testData.count() In [49]: testLabel = testData.select("label").collect() In [50]: for i in range(testRaw): if dtPredictions[i] == testLabel[i]: dtTotalCorrect += 1 In [51]: dtTotalCorrect Out[51]: 940 In [52]: 1.0 * dtTotalCorrect / testRaw Out[52]: 0.6338503034389751 |
1) 先将之前用到的一些代码加载进来。
1 2 3 4 5 6 7 8 9 10 |
In [1]: from pyspark.ml.linalg import Vectors In [2]: from pyspark.ml.classification import DecisionTreeClassifier In [3]: from pyspark.ml.feature import VectorIndexer In [4]: from pyspark.ml import Pipeline In [5]: raw_data = sc.textFile("hdfs://master:9000/data/train_noheader.tsv") In [6]: numRaws = raw_data.count() In [7]: records = raw_data.map(lambda line: line.split('\t')) In [8]: data = records.collect() In [9]: numColumns = len(data[0]) In [10]: data1 = [] |
2) 由于这里对网页类型的标识有很多,需要单独挑选出来进行处理。
1 2 |
#将第三列网页类型的引号去除掉 In [11]: category = records.map(lambda x: x[3].replace("\"","")) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
In [12]: categories = sorted(category.distinct().collect()) In [13]: categories Out[13]: [u'?', u'arts_entertainment', u'business', u'computer_internet', u'culture_politics', u'gaming', u'health', u'law_crime', u'recreation', u'religion', u'science_technology', u'sports', u'unknown', u'weather'] |
3) 查看网页类型的个数。
1 2 3 |
In [14]: numCategories = len(categories) In [15]: numCategories Out[15]: 14 |
4) 紧接着,我们定义一个函数,用于返回当前网页类型的列表。
1 2 3 4 5 6 |
In [16]: def transform_category(x): markCategory = [0] * numCategories index = categories.index(x) markCategory[index] = 1 return markCategory |
5) 通过这样的处理,我们将网页类型这一个特征值转化14个特征值,整体的特征值其实就增加了14个。接下来,我们在处理的时候将这个些特征值加入进去。
1 2 3 4 5 6 7 8 |
In [17]: for i in range(numRaws): trimmed = [ each.replace('"', "") for each in data[i] ] label = float(trimmed[-1]) cate = transform_category(trimmed[3]) #调用函数,返回一个类型列表 features = cate + map(lambda x: 0.0 if x == "?" else (x), trimmed[4:numColumns - 1]) c = (label, Vectors.dense(map(float, features))) data1.append(c) |
6) 创建DataFrame对象。
1 2 3 4 5 |
In [18]: df= spark.createDataFrame(data1, ["label","features"]) #由于后面经常使用df,所以载入内存 In [19]: df.cache() Out[20]: DataFrame[label: double, features: vector] |
7) 建立特征索引。
1 2 |
In [21]: from pyspark.ml.feature import VectorIndexer In [22]: featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(df) |
8) 将数据切分成80%训练集和20%测试集。
1 2 3 4 5 6 7 |
In [23]: (trainingData, testData) = df.randomSplit([0.8, 0.2],seed=1234L) In [24]: trainingData.count() Out[24]: 5912 In [25]: testData.count() Out[25]: 1483 |
9) 创建决策树模型。
1 |
In [26]: dt = DecisionTreeClassifier(maxDepth=5, labelCol="label", featuresCol="indexedFeatures", impurity="entropy") |
10) 构建流水线工作流。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
In [27]: pipeline = Pipeline(stages=[featureIndexer, dt]) In [28]: model = pipeline.fit(trainingData) ## 训练模型 11) 用测试数据再一次做下决策树准确度测试。 In [29]: predictedResultAll = model.transform(testData) In [30]:df_prediction = predictedResultAll.select("prediction").toPandas() In [31]: dtPredictions = list(df_prediction.prediction) #对预测值做准确性统计 In [32]: dtTotalCorrect = 0 #测试集的总行数 In [33]: testRaw = testData.count() In [49]: testLabel = testData.select("label").collect() In [34]: for i in range(testRaw): if dtPredictions[i] == testLabel[i]: dtTotalCorrect += 1 In [35]: dtTotalCorrect Out[35]: 967 In [36]: 1.0 * dtTotalCorrect / testRaw Out[36]: 0.6520566419420094 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
In [1]: from pyspark.ml.linalg import Vectors In [2]: from pyspark.ml.classification import DecisionTreeClassifier In [3]: from pyspark.ml.feature import VectorIndexer In [4]: from pyspark.ml import Pipeline In [5]: raw_data = sc.textFile("hdfs://master:9000/data/train_noheader.tsv") In [6]: numRaws = raw_data.count() In [7]: records = raw_data.map(lambda line: line.split('\t')) In [8]: data = records.collect() In [9]: numColumns = len(data[0]) In [10]: data1 = [] In [11]: category = records.map(lambda x: x[3].replace("\"","")) In [12]: categories = sorted(category.distinct().collect()) In [13]: numCategories = len(categories) In [14]: def transform_category(x): markCategory = [0] * numCategories index = categories.index(x) markCategory[index] = 1 return markCategory In [15]: for i in range(numRaws): trimmed = [ each.replace('"', "") for each in data[i] ] label = float(trimmed[-1]) cate = transform_category(trimmed[3]) #调用函数,返回一个类型列表 features = cate + map(lambda x: 0.0 if x == "?" else (x), trimmed[4:numColumns - 1]) c = (label, Vectors.dense(map(float, features))) data1.append(c) In [16]: df= spark.createDataFrame(data1, ["label","features"]) In [17]: df.cache() In [18]: featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(df) In [19]: dt = DecisionTreeClassifier(maxDepth=5, labelCol="label", featuresCol="indexedFeatures", impurity="entropy") In [20]: pipeline = Pipeline(stages=[featureIndexer, dt]) In [21]: (trainingData, testData) = df.randomSplit([0.8, 0.2],seed=1234L) |
1 2 3 4 5 6 7 8 9 |
# 导入交叉验证和参数网格 In [22]: from pyspark.ml.tuning import CrossValidator, ParamGridBuilder #导入二分类评估器 In [23]: from pyspark.ml.evaluation import BinaryClassificationEvaluator In [24]: evaluator = BinaryClassificationEvaluator() # 初始化一个评估器 #设置参数网格 In [25]: paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [4,5,6]).build() #设置交叉验证的参数 In [26]: cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(2) |
1 |
In [27]: cvModel = cv.fit(trainingData) |
1 |
In [28]: Predictions=cvModel.transform(testData) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
In [29]: df_prediction = Predictions.select("prediction").toPandas() In [30]: dtPredictions = list(df_prediction.prediction) #对预测值做准确性统计 In [31]: dtTotalCorrect = 0 #测试集的总行数 In [32]: testRaw = testData.count() In [34]: testLabel = testData.select("label").collect() In [33]: for i in range(testRaw): if dtPredictions[i] == testLabel[i]: dtTotalCorrect += 1 In [34]: dtTotalCorrect Out[34]: 960 In [35]: 1.0 * dtTotalCorrect / testRaw Out[35]: 0.6473364801078895 我们还可以查看最匹配模型的具体参数 In [36]: bestmodel = cvModel.bestModel.stages[1] In [37]: bestmodel.numFeatures #决策树有36个特征值 Out[37]: 36 In [38]: bestmodel.depth #最大深度为10 Out[38]: 6 In [39]: bestmodel.numNodes #决策树中点有457个 |
11.9.1 在脚本中添加配置信息
本文的示例程序存为 /home/hadoop/projects/spark/pyspark/decisionTree.py。
1 2 3 4 5 6 7 8 9 10 |
from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession #指定本地运行spark conf = SparkConf().setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession.builder.master('local') \ .appName('DecisionTree') \ .config("spark.some.config.option", "some-value") \ .getOrCreate() |
spark 2.0 之前使用pyspark decisionTree.py 来执行文件,spark 2.0之后统一用spark-submit decisionTree.py 执行文件。读者可以使用spark-submit --help来查看相关命令的帮助信息。
1 |
$ spark-submit decisionTree.py |
本章我们了Spark中的 PySpark使用方法,对于PySpark做了简单的介绍。讨论了分类模型中最常见的决策树模型在PySpark 的应用,用实例讲解了如何对数据进行清理、转换,分析了分类模型的准确性有待提高的的原因,通过可视化对决策树的不同深度下的准确度进行讨论。