作者归档:feiguyun

第14章TensorFlowOnSpark详解

前面我们介绍了Spark MLlib的多种机器学习算法,如分类、回归、聚类、推荐等,Spark目前还缺乏对神经网络、深度学习的足够支持,但近几年市场对神经网络,尤其对深度学习热情高涨,成了当下很多企业的研究热点,缺失神经网络的支持,这或许也算是Spark MLlib尚欠不足之处吧。
不过好消息是TensorFlow这个深度学习框架,已经有了Spark接口,即TensorFlowOnSpark。TensorFlow是目前很热门的深度学习框架,是Google于2015年11月9日开源的第二代深度学习系统,也是AlphaGo的基础程序。
本章我们将介绍深度学习最好框架TensorFlow及TensorFlowOnSpark,具体包括:
TensorFlow简介
TensorFlow实现卷积神经网络
分布式TensorFlow
TensorFlowOnSpark架构
TensorFlowOnSpark实例

14.1TensorFlow简介

14.1.1TensorFlow的安装

安装TensorFlow,因本环境的python2.7采用anaconda来安装,故这里采用conda管理工具来安装TensorFlow,目前conda缺省安装版本为TensorFlow 1.1。

验证安装是否成功,可以通过导入tensorflow来检验。
启动ipython(或python)

14.1.2TensorFlow的发展

2015年11月9日谷歌开源了人工智能系统TensorFlow,同时成为2015年最受关注的开源项目之一。TensorFlow的开源大大降低了深度学习在各个行业中的应用难度。TensorFlow的近期里程碑事件主要有:
2016年04月:发布了分布式TensorFlow的0.8版本,把DeepMind模型迁移到TensorFlow;
2016年06月:TensorFlow v0.9发布,改进了移动设备的支持;
2016年11月:TensorFlow开源一周年;
2017年2月:TensorFlow v1.0发布,增加了Java、Go的API,以及专用的编译器和调试工具,同时TensorFlow 1.0引入了一个高级API,包含tf.layers,tf.metrics和tf.losses模块。还宣布增了一个新的tf.keras模块,它与另一个流行的高级神经网络库Keras完全兼容。
2017年4月:TensorFlow v1.1发布,为 Windows 添加 Java API 支,添加 tf.spectral 模块, Keras 2 API等;
2017年6月:TensorFlow v1.2发布,包括 API 的重要变化、contrib API的变化和Bug 修复及其他改变等。

14.1.3TensorFlow的特点

14.1.4TensorFlow编程模型

TensorFlow如何工作?我们通过一个简单的实例进行说明,为计算x+y,你需要创建下图(图14-1)这张数据流图:

图14-1计算x+y的数据流图

以下构成上数据流图(图14-1)的详细步骤:
1)定义x= [1,3,5],y =[2,4,7],这个图和tf.Tensor一起工作来代表数据的单位,你需要创建恒定的张量:

2)定义操作

3)张量和操作都有了,接下来就是创建图

注意:这一步不是必须的,在创建回话时,系统将自动创建一个默认图。
4)为了运行这图你将需要创建一个回话(tf.Session),一个tf.Session对象封装了操作对象执行的环境,为了做到这一点,我们需要定义在会话中将要用到哪一张图:

5)想要执行这个操作,要用到tf.Session.run()这个方法:

6)运行结果:
[ 3 7 12]

14.1.5TensorFlow常用函数

14.1.6TensorFlow的运行原理

TensorFlow有一个重要组件client,即客户端,此外,还有master、worker,这些有点类似Spark的结构。它通过Session的接口与master及多个worker相连,其中每一个worker可以与多个硬件设备(device)相连,比如CPU或GPU,并负责管理这些硬件。而master则负责管理所有worker按流程执行计算图。

14.2TensorFlow实现卷积神经网络

神经网络可为机器学习中最活跃的领域之一,尤其代表深度学习的卷积神经(Convolutional Neural Network,CNN)、循环神经网络(Recurrent Neural Network,RNN)更是炙手可热。

14.2.1卷积神经网络简介

卷积神经网络是人工神经网络的一种,已成为图像识别、视频处理、语音分析等领域的研究热点。它的权值共享网络结构使之更类似于生物神经网络,减少了权值的数量,降低了网络模型的复杂度,防止因参数太多导致过拟合。

14.2.3卷积神经网络的网络结构

接下来,我们利用训练集训练卷积神经网络模型,然后在测试集上验证该模型。
搭建的卷积神经网络使用的一些参数是:
卷积层1:kernel_size [5, 5], stride=1,32个卷积窗口
池化层1: pool_size [2, 2], stride = 2
卷积层2:kernel_size [5, 5], stride=1,64个卷积窗口
池化层2: pool_size [2, 2], stride = 2
全连接层: 1024个特征,使用dropout减少过拟合
输出层: 使用softmax进行分类

14.2.4.1 导入数据

首先启动ipython,进入交互计算环境,当然直接启动python也可,然后通过TensorFlow自带的函数读取图片数据。

如果无法直接通过input_data下载,可以先把MNIST数据下载,然后,修改
python/learn/datasets/mnist.py文件中read_data_sets函数中4个local_file的值
具体如下,注释原来的local_file,新增4行local_file

更加数据实际存放路径,修改read_data_sets中读取文件路径。

14.2.4.2 权重初始化

# 正态分布,标准差为0.1,默认最大为1,最小为-1,均值为0

14.2.4.3 构建卷积神经网络结构

14.2.4.4 训练评估模型

这里只迭代了2000次,运行结果;如果迭代20000次,在测试集上的精度可到99.2%左右
## -- End pasted text --
step 0, training accuracy 0.14
step 100, training accuracy 0.86
step 200, training accuracy 0.94
step 300, training accuracy 0.94
step 400, training accuracy 0.94
step 500, training accuracy 0.98
step 600, training accuracy 0.94
step 700, training accuracy 0.94
step 800, training accuracy 0.98
step 900, training accuracy 0.98
step 1000, training accuracy 1
step 1100, training accuracy 0.94
step 1200, training accuracy 0.98
step 1300, training accuracy 0.96
step 1400, training accuracy 0.92
step 1500, training accuracy 0.96
step 1600, training accuracy 0.96
step 1700, training accuracy 1
step 1800, training accuracy 1
step 1900, training accuracy 0.96

在测试集上,测试模型精度

14.3TensorFlow实现循环神经网络

14.3.1循环神经网络简介

在传统的神经网络模型中,是从输入层到隐含层再到输出层,层与层之间是全连接的,
每层之间的节点是无连接的。但是这种普通的神经网络对于很多问题却无能无力。

14.3.2LSTM循环神经网络简介

LSTM是一种特殊的RNNs,可以很好地解决长时依赖问题。

14.3.4TensorFlow实现循环神经网络

前面我们用卷积神经网络,对MNIST中的手写数进行设别,如果迭代20000次,精度可达到99.2左右,这个精度应该比较高;如果我们用循环神经网络来识别,是否可行?如果可以,效果如何?
为了适合使用RNN来识别,每张图片大小为28x28像素,我们把每张图片的每一行(元素个数为28)作为输入数据n_inputs,把每一行(一张图片共28行)看成是与时间序列有关的步数n_steps,这样图片的所有信息都用上了,而且适合使用RNN的应用场景。
启动ipython,进入ipython的交互式界面,导入需要的库,并启动交互式会话。

加载数据,具体实现细节可参考14.2.4.1小节,这里就不详细说明了。

1. 构建模型
设置训练模型的超参数,学习速率,批量大小等。

设置循环神经网络的参数,包括输入数长度,输入的步数,隐藏节点数,类别数等。

定义输入数据及权重等

定义权重及初始化偏移量

定义并初始化Input gate、Forget gate、Output gate、Memory cell等的输入数据、权重、偏移量,这里采用tensorflow中truncated_normal函数初始化相关参数值。

创建循环神经网络结构

2. 定义损失函数及优化器

3. 训练数据及评估模型

运行结果,以下是最后批次的运行结果。
Iter 8000, Minibatch Loss= 0.085752, Training Accuracy= 0.97656
Iter 8100, Minibatch Loss= 0.065435, Training Accuracy= 0.96875
Iter 8200, Minibatch Loss= 0.088926, Training Accuracy= 0.97656
Iter 8300, Minibatch Loss= 0.039572, Training Accuracy= 1.00000
Iter 8400, Minibatch Loss= 0.050593, Training Accuracy= 0.98438
Iter 8500, Minibatch Loss= 0.030424, Training Accuracy= 0.99219
Iter 8600, Minibatch Loss= 0.026174, Training Accuracy= 0.99219
Iter 8700, Minibatch Loss= 0.045043, Training Accuracy= 0.98438
Iter 8800, Minibatch Loss= 0.031143, Training Accuracy= 0.98438
Iter 8900, Minibatch Loss= 0.055115, Training Accuracy= 0.99219
Iter 9000, Minibatch Loss= 0.061676, Training Accuracy= 0.98438
Iter 9100, Minibatch Loss= 0.123581, Training Accuracy= 0.97656
Iter 9200, Minibatch Loss= 0.057620, Training Accuracy= 0.98438
Iter 9300, Minibatch Loss= 0.043013, Training Accuracy= 0.99219
Iter 9400, Minibatch Loss= 0.067405, Training Accuracy= 0.98438
Iter 9500, Minibatch Loss= 0.020679, Training Accuracy= 1.00000
Iter 9600, Minibatch Loss= 0.079038, Training Accuracy= 0.98438
Iter 9700, Minibatch Loss= 0.080076, Training Accuracy= 0.97656
Iter 9800, Minibatch Loss= 0.010582, Training Accuracy= 1.00000
Iter 9900, Minibatch Loss= 0.019426, Training Accuracy= 1.00000
Optimization Finished!

在测试集上验证模型

运行结果如下,这个结果虽然比CNN结果低些,但也是不错的一个结果。
Testing Accuracy: 0.976562

14.4分布式TensorFlow

2016年4月14日,Google发布了分布式TensorFlow,能够支持在几百台机器上并行训练。分布式的TensorFlow由高性能的gRPC库作为底层技术支持。

14.4.1客户端、主节点和工作节点间的关系

14.4.2分布式模式

常用的深度学习训练模型为数据并行化,即TensorFlow任务采用相同的训练模型在不同的小批量数据集上进行训练,然后在参数服务器上更新模型的共享参数。TensorFlow支持同步训练和异步训练两种模型训练方式。

14.4.3在Pyspark集群环境运行TensorFlow

这节将通过神经网络来模拟一个一元二次方程:y=x^2-0.5,
TensorFlowOnSpark的详细配置,请参考14.4节。已集群方式启动pyspark:

进入pyspark的交换界面

构造一个神经网络

输出结果:
1.62758
0.00996406
0.00634915
0.00483868
0.0043179
0.00399014
0.00368176
0.00337165
0.00309145
0.00284696
0.00267657
0.00255845
0.0024702
0.00240239
0.00235583
0.00232014
0.00229183
0.00226797
0.00224843

14.5TensorFlowOnSpark架构

TensorFlowOnSpark(TFoS),支持 TensorFlow 在 Spark 和 Hadoop 上的分布式运行。

14.6TensorFlowOnSpark安装

安装TensorFlowOnSpark,采用pip管理工具进行安装,缺省安装是1.0版本。

执行以上命令后,在用户当前目录下,将新增一个TensorFlowOnSpark目录。
然后,在.bashrc定义该路径。

可以通过pyspark环境来验证,以上2个安装是否成功。

导入这些库,如果没有异常,说明安装成功。接下来开始为训练数据做一些准备工作。
对scripts目录进行打包,便于把该包发布到各worker上

14.7TensorFlowOnSpark实例

使用TensorFlowOnSpark对MNIST数据进行预测,MNIST是一个手写数字数据库,它有60000个训练样本集和10000个测试样本集,train-images-idx3-ubyte.gz、train-labels-idx1-ubyte.gz等四个文件。这些图像数据都保存在二进制文件中。每个样本图像的宽高为28*28。
下载MNIST数据

14.7.1TensorFlowOnSpark单机模式实例

设置本机相关参数,在单机上启动一个master节点,两个worker节点。

启动以后,通过jps可以看到如下一些进程。一个master,两个worker,namenode是之前启动hadoop的进程。

相关服务起来后,接下来把MNIST数据上传到HDFS上,并把数据转换cvs格式。

运行完成后,通过hadoop fs命令可以在HDFS上看到如下信息:

数据加载转换成功后,开始训练数据。

运行完成后,可以看到如下内容:

如果运行过程中,过程被卡,可以调整mnist_dist.py文件中两处(在115,125行)logdir=logdir改为logdir=None。
训练完成后,接下来就是用测试集验证模型,并对结果进行预测。

运行完成以后,在HDFS上,就可看到predictions目录及相关内容。

打开其中一个文件,可以看到预测结果信息。
2017-06-18T05:51:42.397905 Label: 5, Prediction: 5
2017-06-18T05:51:42.397923 Label: 9, Prediction: 8
2017-06-18T05:51:42.397941 Label: 7, Prediction: 5
2017-06-18T05:51:42.397958 Label: 3, Prediction: 5
2017-06-18T05:51:42.397976 Label: 4, Prediction: 8
2017-06-18T05:51:42.397993 Label: 9, Prediction: 8
2017-06-18T05:51:42.398012 Label: 6, Prediction: 5

14.7.2TensorFlowOnSpark集群模式实例

设置本机相关参数,在以集群方式启动spark,一个master节点,slave1、slave2作为
两个worker节点,各节点资源配置信息。
训练模型

测试模型

查看运行结果

运行时各节点报错信息,可以查看spark/work/app-20170620085449-0003/1下的

14.8小结

为了弥补Spark机器学习中,缺乏神经网络、深度学习等的不足,这章我们介绍脱胎于AlphaGo的深度学习框架TensorFlow,以基础知识为主,在这个基础上介绍了使用TensorFlow的几个实例,最后介绍TensorFlow的分布式架构及与Spark整合的架构TensorFlowOnSpark。

第13章 使用Spark Streaming构建在线学习模型

前面我们介绍的这些算法,一般基于一个或几个相对固定的文件,以这样的数据为模型处理的源数据是固定的,这样的数据或许很大,很多。训练或测试都是建立在这些固定数据之上,当然,测试时,可能取这个数据源之外的数据,如新数据或其他数据等。训练模型的数据一般是相对固定的,这样的机器学习的场景是很普遍的。
但实际环境中,还有其他一些场景,如源数据是经常变换,就像流水一样,时刻在变换着,如很多在线数据、很多日志数据等等。面对这些数据的学习我们该如何处理呢?
这个问题实际上属于流水计算问题,目前解决这类问题有Spark Streaming、Storm、Samza等。这章我们主要介绍Spark Streaming。
本章主要包括以下内容:
 介绍Spark Streaming主要内容
 Spark Streaming入门实例
 在线学习实例

13.1 Spark Streaming简介

Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。

13.1.1Spark Streaming常用术语

在简介Spark Streaming前,我们先简单介绍Streaming的一些常用术语。

13.1.2Spark Streaming处理流程

Spark Streaming处理的数据流图如图13-1所示。

图13-1 Spark Streaming计算过程

13.2 Dstream操作

RDD有很多操作和转换,与RDD类似,DStream也提供了自己的一系列操作方法,本节主要介绍如何操作DStream,包括输入、转换、修改状态及输出等。

13.2.1 Dstream输入

在Spark Streaming中所有的操作都是基于流的,而输入源是一切操作的起点。
Spark Streaming 提供两种类型的流式输入数据源:
 基础输入源:能直接应用于StreamingContext API输入源。例如:文件系统、Socket(套接字)连接和 Akka actors;
 高级输入源:能应用于特定工具类的输入源,如 Kafka、Flume、Kinesis、Twitter 等,使用这些输入源需要导入一些额外依赖包。

13.2.2 Dstream转换

DStream转换操作是在一个或多个DStream上创建新的DStream。

13.2.3 Dstream修改

Spark Streaming除提供一些基本操作,还提供一些状态操作。

13.2 .4Dstream输出

Spark Streaming允许DStream的数据输出到外部系统,如数据库、文件系统等。

13.3 Spark Streaming应用实例

先启动nc,端口为9999

然后,以本地方式启动spark shell

在启动了nc的界面输入:

在spark shell界面,可以看到如下输出:

13.4 Spark Streaming在线学习实例

前面我们简单介绍一个利用nc产生文本数据,Spark Streaming实时统计词频的一个实例,通过这个例子,我们对Streaming有个大致了解,它的源数据可以是实时产生、实时变化的,基于这个数据流,Spark Streaming能实时进行统计词频信息,并输出到界面。
除了统计词频,实际上Spark Streaming 还可以做在线机器学习工作,目前Spark Streaming支持Streaming Linear Regression, Streaming KMeans等,这节我们模拟一个在线学习线性回归的算法,源数据为多个文件,首先在一个文件中训练模型,然后在新数据上进行调整模型,对新数据进行预测等。

13.5小结

前几章主要介绍了Spark ML对批量数据或离线数据的分析和处理,本章主要介绍Spark Streamin对在线数据或流式数据的处理及分析,首先对Spark Streaming的一些概念、输入源、Dstream的一些转换、修改、输出作了简单介绍,然后,通过两个实例把这些内容结合在一起,进一步说明Spark Streaming在线统计、在线学习的具体使用。


本章数据集下载
第12章 Spark R 朴素贝叶斯模型

前一章我们介绍了PySpark,就是用Python语言操作Spark大数据计算框架上的任务,这样把自然把Python的优点与Spark的优势进行叠加。Spark提供了Python的API,也提供了R语言的API,其组件名称为Spark R。Spark R的运行原理或架构,具体请看图12-1。

 

 

图12-1 Spark R 架构图

Spark R的架构类似于PySpark,Driver端除了一个JVM进程(包含一个SparkContext,在Spark2.X中SparkContext已经被SparkSession所代替)外,还有起一个R的进程,这两个进程通过Socket进行通信,用户可以提交R语言代码,R的进程会执行这些R代码,
当R代码调用Spark相关函数时,R进程会通过Socket触发JVM中的对应任务。
当R进程向JVM进程提交任务的时候,R会把子任务需要的环境进行打包,并发送到JVM的driver端。通过R生成的RDD都会是RRDD类型,当触发RRDD的action时,Spark的执行器会开启一个R进程,执行器和R进程通过Socket进行通信。执行器会把任务和所需的环境发送给R进程,R进程会加载对应的package,执行任务,并返回结果。
本章通过一个实例来说明如何使用Spark R,具体内容如下:
 Spark R简介
 把数据上传到HDFS,然后导入Hive,最后从Hive读取数据
 使用朴素贝叶斯分类器
 探索数据
 预处理数据
 训练模型
 评估模型

12.1. Spark R简介

目前SparkR的最新版本为2.0.1,API参考文档(http://spark.apache.org/docs/latest/api/R/index.html)。

12.2获取数据

12.2.1 SparkDataFrame数据结构说明

SparkDataFrame是Spark提供的分布式数据格式(DataFrame)。类似于关系数据库中的表或R语言中的DataFrame。SparkDataFrames可以从各种各样的源构造,例如:结构化数据文件,Hive中的表,外部数据库或现有的本地数据。

12.2.2创建SparkDataFrame

1.从本地文件加载数据,生成SparkDataFrame
SparkR支持通过SparkDataFrame接口对各种数据源进行操作。示例:

2.利用R环境的data frames创建SparkDataFrame
创建SparkDataFrame的最简单的方法是将本地R环境变量中的data frames转换为SparkDataFrame。我们可以使用as.DataFrame或createDataFrame函数来创建SparkDataFrame。作为示例,我们使用R自带的iris数据集来创建SparkDataFrame。

3.从HDFS文件系统加载数据,生成SparkDataFrame
示例:

4.读取Hive数据仓库中的表,生成SparkDataFrame
我们还可以从Hive表创建SparkDataFrame。

12.2.3 SparkDataFrame的常用操作

1.选择行,或者列

2.数据分组,聚合

3.对SparkDataFrame的列进行运算操作

4.apply系列函数的应用
• dapply函数类似于R语言的apply函数,看一个示例。

12.3朴素贝叶斯分类器

该案例数据来自泰坦尼克号人员存活情况,响应变量为Survived,包含2个分类(yes,no),特征变量有Sex 、 Age 、Class(船舱等级),说明如下:
Class :0 = crew, 1 = first, 2 = second, 3 = third Age :1 = adult, 0 = child Sex :1 = male, 0 = female Survived :1 = yes, 0 = no

12.3.1数据探查

让我们来观察一下数据,

12.3.2对原始数据集进行转换

12.3.3查看不同船舱的生还率差异

然后,对比一下不同性别之间的生还率:

最后再看看不同年龄段的生还情况:

12.3.4转换成SparkDataFrame格式的数据

12.3.5模型概要

12.3.6预测

12.3.7评估模型

计算准确率

计算召回率

计算精准率

12.4 小结

本章主要介绍了如何使用Spark R组件的问题,Spark R 给R开发人员提供很多API,利用这些API,开发人员就可以通过R语言操作Spark,把用R编写的代码放在Spark这个大数据技术平台运行,这样可以使R不但可以操作HDFS或Hive中数据,也自然使用Spark分布式基于内存的架构。


本章数据集下载

第11章 PySpark 决策树模型

Spark不但好用、而且还易用、通用,它提供多种的开发语言的API,除了Scala外,还有Java、Python、R等,可以说集成目前市场最有代表性的开发语言,使得Spark受众上升几个数据量级,同时也无形中降低了学习和使用它的门槛,使得很多熟悉Java、Python、R的编程人员、数据分析师,也可方便地利用Spark大数据计算框架来实现他们的大数据处理、机器学习等任务。
Python作为机器学习中的利器,一直被很多开发者和学习者所推崇的一种语言。除了开源、易学以及简洁的代码风格的特性之外,Python当中还有很多优秀的第三方的库,为我们对数据进行处理、探索和模型的构建提供很大的便利,如Pandas、Numpy、Scipy、Matplotlib、StatsModels、Scikit-Learn、Keras等。Python的强大还体现在它的与时俱进,它与大数据计算平台Spark的结合,可为是强强联合、优势互补、相得益彰,这就有了现如今Spark当中一个重要分支--PySpark。其内部架构可参考图11-1(该图取自https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals?spm=5176.100239.0.0.eI85ij)。

图11-1 PySpark架构图

PySpark的Python解释器在启动时会同时启动一个JVM,Python解释器与JVM进程通过socket进行通信,在python driver端,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext。Py4J只使用在driver端,用于本地python与Java SparkContext objects的通信。大量数据的传输使用的是另一个机制。RDD在python下的转换会被映射成java环境下PythonRDD。在远端worker机器上,PythonRDD对象启动一些子进程并通过pipes与这些子进程通信,以此send用户代码和数据。
本章节就机器学习中的决策树模型,使用PySpark中的ML库以及IPython交互式环境进行示例。具体内容如下:
 决策树简介
 数据加载
 数据探索
 创建决策树模型
 训练模型并进行预测
 利用交叉验证、网格参数等进行模型调优
 最后生成一个可执行python脚本

11.1 PySpark 简介

在Spark的官网上这么介绍PySpark:“PySpark is the Python API for Spark”,也就是说PySpark其实是Spark为Python提供的编程接口。此外,Spark还提供了关于Scala、Java和R的编程接口,关于Spark为R提供的编程接口(Spark R)将在第12章进行介绍。

11.2 决策树简介

决策树在机器学习中是很常见且经常使用的模型,它是一个强大的非概率模型,可以用来表达复杂的非线性模式和特征相互关系。

图11-2决策树结构

关于决策树的原理,这里不再赘述。本章着重讨的是,决策树的分类模型在PySpark中的应用。

11.3数据加载

11.3.1 原数据集初探

这里的数据选择为某比赛的数据集,用来预测推荐的一些页面是短暂(昙花一现)还是长久(长时流行)。原数据集为train.tsv,存放路径在 /home/hadoop/data/train.tsv。
先使用shell命令对数据进行试探性的查看,并做一些简单的数据处理。
1) 查看前2行数据

数据集中的第1行为标题(字段名)行,下面是一些的字段说明。
查看文件记录总数

结果显示共有:数据集一共有7396条数据
2) 由于textFile目前不好过滤标题行数据,为便于spark操作数据,需要先删除标题。

3) 将数据文件上传到 hdfs

4) 查看是否成功

11.3.2 PySpark 的启动

以spark Standalone模式启动spark集群,保证内存分配充足。

[注]:使用pyspark --help 可以查看指令的详细帮助信息。

11.3.3 基本函数

这里将本章节中需要用到函数和方法做一个简单的说明,如表11-4所示。
表11-4 本章使用的一些函数或方法简介

11.4数据探索

1) 通过sc对象的textFile方法,载入本地数据文件,创建RDD

2) 查看第1行数据

3) 查看数据文件的总行数

4) 按键进行统计

原数据文件总的行数为7396,由于我们在数据加载中将数据集的第一行数据已经去除掉,所以这里结果为7395。

11.5数据预处理

1) 由于后续的算法我们不需要时间戳以及网页的内容,所以这里先将其过滤掉。

2) 查看records 数据结构

3) 查看每一行的列数

导入Vectors 矢量方法

导入决策树分类器

4) 将RDD中的所有元素以列表的形式返回

5) 查看data数据一行有多少列

6) 定义一个列表data1,存放清理过的数据,格式为[(label_1, features_1), (label_2, features_2),…]

对数据进行清理工作中的1,2,3步

11.6创建决策树模型

1) 将data1 转换为DataFrame对象,label表示标签列,features 表示特征值列

2) 由于后面会经常使用,所以将df载入内存

3) 建立特征索引

4) 将数据切分成80%训练集和20%测试集

5) 指定决策树模型的深度、标签列,特征值列,使用信息熵(entropy)作为评估方法,并训练数据。

6) 构建流水线工作流

下面我们用一组已知数据和一组新数据重新预测下结果:

11.7训练模型进行预测

1) 使用第一行数据进行预测结果,看看是否相符合,这里先来看一下原数据集第一行数据

2) 使用数据集中第一行的特征值数据进行预测

3) 将第一行的特征值数据修改掉2个(这里换掉第一个和第二个值),进行该特征值下的预测.

4) 进行新数据的预测

5) 下面我们用测试数据做决策树准确度测试

11.8模型优化
在上一个小节中,我们发现使用决策树的正确率不算高,只有63.3850%。在这一小节,我们探究一下改进预测准确率的方法。

11.8.1特征值的优化

1) 先将之前用到的一些代码加载进来。

2) 由于这里对网页类型的标识有很多,需要单独挑选出来进行处理。

将网页的唯一类型删选出来,并进行排序。

3) 查看网页类型的个数。

4) 紧接着,我们定义一个函数,用于返回当前网页类型的列表。

5) 通过这样的处理,我们将网页类型这一个特征值转化14个特征值,整体的特征值其实就增加了14个。接下来,我们在处理的时候将这个些特征值加入进去。

6) 创建DataFrame对象。

7) 建立特征索引。

8) 将数据切分成80%训练集和20%测试集。

9) 创建决策树模型。

10) 构建流水线工作流。

可以看到,准确率增大到了63.3850%,而未做优化前的准确率是65.2057%。增长了1.88%。效果还是比较显著的。

11.8.2交叉验证和网格参数

创建交叉验证和网格参数

通过交叉验证来训练模型

测试模型

准确率统计

11.9脚本方式运行

11.9.1 在脚本中添加配置信息

创建一个decisionTree.py文件,添加如下代码来配置启动pyspark。将上述在pyspark的IPython中的代码添加到该文件中来。
本文的示例程序存为 /home/hadoop/projects/spark/pyspark/decisionTree.py。

11.9.2运行脚本程序

spark 2.0 之前使用pyspark decisionTree.py 来执行文件,spark 2.0之后统一用spark-submit decisionTree.py 执行文件。读者可以使用spark-submit --help来查看相关命令的帮助信息。

11.10小结

本章我们了Spark中的 PySpark使用方法,对于PySpark做了简单的介绍。讨论了分类模型中最常见的决策树模型在PySpark 的应用,用实例讲解了如何对数据进行清理、转换,分析了分类模型的准确性有待提高的的原因,通过可视化对决策树的不同深度下的准确度进行讨论。

第1章:Keras基础

1.1Keras简介

Tensorflow、theano是神经网络、机器学习的基础框架,但使用它们大家神经网络,尤其深度学习网络,像tensorflow或theano属于符号编程,需要涉及如何定义变量、图形、各层、session、初始化、各种算法等等,有时显得比较繁琐,尤其对新手而言,更是如此,是否有更简单的方法呢?keras就是一个很好工具!Keras是一个高层神经网络API,Keras由纯Python编写而成并基Tensorflow、Theano以及CNTK后端。Keras 为支持快速实验而生,能够把你的想法迅速转换为结果。
其主要特点:

  • 简便的原型设计
  • 支持CNN和RNN,或二者的结合
  • 在CPU和GPU间无缝切换

keras资料:
keras官网:
https://keras.io/
keras中文网
https://keras-cn.readthedocs.io/en/latest/

1.2keras安装

1)安装Python3.6
建议用anaconda安装,先下载最新版anaconda 支持linux或windows
2)安装numpy、scipy

3)安装theano

4) 安装tensorflow

(gpu 需要有GPU卡,并安装GPU驱动及cuda等)
5)安装keras

6)测试
【说明】变更keras后台支持的几种方法

(1)修改~/.keras/keras.json

(2)在客户端直接修改
改为theano为支持后台

改为tensorflow为支持后台(缺省)

在客户端修改,影响范围是当前脚本或session。

1.3 keras常用概念

François Chollet作为人工智能时代的先行者,为无数的开发者提供了开源深度学习框架Keras,目前就职于Google公司,主推tf.keras。
在开始学习Keras之前,我们希望传递一些关于Keras,关于深度学习的基本概念和技术,我们建议新手在使用Keras之前浏览一下本页面提到的内容,这将减少你学习中的困惑。

  • 符号计算

Keras的底层库使用Theano或TensorFlow,这两个库也称为Keras的后端。无论是Theano还是TensorFlow,都是一个“符号式”的库。
因此,这也使得Keras的编程与传统的Python代码有所差别。笼统的说,符号主义的计算首先定义各种变量,然后建立一个“计算图”,计算图规定了各个变量之间的计算关系。建立好的计算图需要编译以确定其内部细节,然而,此时的计算图还是一个“空壳子”,里面没有任何实际的数据,只有当你把需要运算的输入放进去后,才能在整个模型中形成数据流,从而形成输出值。
就像用管道搭建供水系统,当你在拼水管的时候,里面是没有水的。只有所有的管子都接完了,才能送水。
符号计算也叫数据流图,如下图是一个经典的数据流计算可视化图形。
saddle_point_evaluation_optimizers

  • 张量

张量,或tensor,可以看作是向量、矩阵的自然推广,用来表示广泛的数据类型。张量的阶数也叫维度。
0阶张量,即标量,是一个数。
1阶张量,即向量,一组有序排列的数
2阶张量,即矩阵,一组向量有序的排列起来
3阶张量,即立方体,一组矩阵上下排列起来
4阶张量......
依次类推
重点:关于维度的理解
假如有一个10长度的列表,那么我们横向看有10个数字,也可以叫做10维度,纵向看只能看到1个数字,那么就叫1维度。注意这个区别有助于理解Keras或者神经网络中计算时出现的维度问题
张量的阶数有时候也称为维度,或者轴,轴这个词翻译自英文axis。譬如一个矩阵[[1,2],[3,4]],是一个2阶张量,有两个维度或轴,沿着第0个轴(为了与python的计数方式一致,本文档维度和轴从0算起)你看到的是[1,2],[3,4]两个向量,沿着第1个轴你看到的是[1,3],[2,4]两个向量。
 数据格式(data_format)
目前主要有两种方式来表示张量:
a) th模式或channels_first模式,Theano和caffe使用此模式。
b)tf模式或channels_last模式,TensorFlow使用此模式。
模式的修改,可以通修改配置文件~/.keras/keras.json中的image_data_format。

下面举例说明两种模式的区别:
对于100张RGB3通道的16×32(高为16宽为32)彩色图,
th表示方式:(100,3,16,32)
tf表示方式:(100,16,32,3)
唯一的区别就是表示通道个数3的位置不一样。

  • 模型

Keras有两种类型的模型,序贯(或序列)模型(Sequential)和函数式模型(Model),函数式模型应用更为广泛,序贯模型是函数式模型的一种特殊情况。
a)序贯模型(Sequential):单输入单输出,一条路通到底,层与层之间只有相邻关系,没有跨层连接。这种模型编译速度快,操作也比较简单
b)函数式模型(Model):多输入多输出,层与层之间任意连接。这种模型编译速度慢。

  • batch

这个概念与Keras无关,老实讲不应该出现在这里的,但是因为它频繁出现,而且不了解这个技术的话看函数说明会很头痛,这里还是简单说一下。
深度学习的优化算法,说白了就是梯度下降。每次的参数更新有两种方式。
第一种,遍历全部数据集算一次损失函数,然后算函数对各个参数的梯度,更新梯度。这种方法每更新一次参数都要把数据集里的所有样本都看一遍,计算量开销大,计算速度慢,不支持在线学习,这称为Batch gradient descent,批梯度下降。
另一种,每看一个数据就算一下损失函数,然后求梯度更新参数,这个称为随机梯度下降,stochastic gradient descent。这个方法速度比较快,但是收敛性能不太好,可能在最优点附近晃来晃去,hit不到最优点。两次参数的更新也有可能互相抵消掉,造成目标函数震荡的比较剧烈。
为了克服两种方法的缺点,现在一般采用的是一种折中手段,mini-batch gradient decent,小批的梯度下降,这种方法把数据分为若干个批,按批来更新参数,这样,一个批中的一组数据共同决定了本次梯度的方向,下降起来就不容易跑偏,减少了随机性。另一方面因为批的样本数与整个数据集相比小了很多,计算量也不是很大。
基本上现在的梯度下降都是基于mini-batch的,所以Keras的模块中经常会出现batch_size,就是指这个。

  • epochs

epochs指的就是训练过程中数据将被“轮”多少次。

1.4 keras与Tensorflow

1.5 keras的主要模块

【说明】
这里选择了一些常用模块,更多或更详细的说明请参考keras中文网站:
https://keras-cn.readthedocs.io/en/latest/
我们先从总体上了解一下Keras的主要模块及常用层,可参考下图,然后我们对各模块和常用层展开详细说明。

该图取自:http://blog.csdn.net/zdy0_2004/article/details/74736656

1.5.1优化器(optimizers)

优化器是调整每个节点权重的方法,看一个代码示例:

可以看到优化器在模型编译前定义,作为编译时的两个参数之一。
代码中的sgd是随机梯度下降算法
lr表示学习速率
momentum表示动量项
decay是学习速率的衰减系数(每个epoch衰减一次)
Nesterov的值是False或者True,表示使不使用Nesterov momentum
除了sgd,还可以选择的优化器有RMSprop(适合递归神经网络)、Adagrad、Adadelta、Adam、Adamax、Nadam等。

1.5.2目标函数(objectives)

目标函数又称损失函数(loss),目的是计算神经网络的输出与样本标记的差的一种方法,代码示例:

mean_squared_error就是损失函数的名称。
可以选择的损失函数有:
mean_squared_error,mean_absolute_error,squared_hinge,hinge,binary_crossentropy,categorical_crossentropy
其中binary_crossentropy 和 categorical_crossentropy也就是交叉熵为logloss,一般用于分类模型。

1.5.3激活函数(activations)

每一个神经网络层都需要一个激活函数,代码示例:

可以选择的激活函数有:
linear、sigmoid、hard_sigmoid、tanh、softplus、relu、 softplus,softmax、softsign
还有一些高级激活函数,比如如PReLU,LeakyReLU等。

1.5.4 参数初始化(Initializations)

这个模块的作用是在添加layer时调用init进行这一层的权重初始化,有两种初始化方法

1.5.4.1 通过制定初始化方法的名称

示例代码:

可以选择的初始化方法有:
uniform、lecun_uniform、normal、orthogonal、zero、glorot_normal、he_normal等。

1.5.4.2 通过调用对象

该对象必须包含两个参数:shape(待初始化的变量的shape)和name(该变量的名字),该可调用对象必须返回一个(Keras)变量,例如K.variable()返回的就是这种变量,示例代码:

或者

所以说可以通过库中的方法设定每一层的初始化权重,
也可以自己初始化权重,自己设定的话可以精确到每个节点的权重。

1.5.5 常用层(layer)

keras的层主要包括:
常用层(Core)、卷积层(Convolutional)、池化层(Pooling)、局部连接层、递归层(Recurrent)、嵌入层( Embedding)、高级激活层、规范层、噪声层、包装层,当然也可以编写自己的层

1.5.5.1 Dense层(全连接层)

keras.layers.core.Dense(units, activation=None, use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None)
Dense就是常用的全连接层,所实现的运算是output = activation(dot(input, kernel)+bias)。其中activation是逐元素计算的激活函数,kernel是本层的权值矩阵,bias为偏置向量,只有当use_bias=True才会添加。如果本层的输入数据的维度大于2,则会先被压为与kernel相匹配的大小。
参数:
units:大于0的整数,代表该层的输出维度。
activation:激活函数,为预定义的激活函数名(参考激活函数),或逐元素(element-wise)的Theano函数。如果不指定该参数,将不会使用任何激活函数(即使用线性激活函数:a(x)=x)
use_bias: 布尔值,是否使用偏置项
kernel_initializer:权值初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
bias_initializer:偏置向量初始化方法,为预定义初始化方法名的字符串,或用于初始化偏置向量的初始化器。参考initializers
kernel_regularizer:施加在权重上的正则项,为Regularizer对象
bias_regularizer:施加在偏置向量上的正则项,为Regularizer对象
activity_regularizer:施加在输出上的正则项,为Regularizer对象
kernel_constraints:施加在权重上的约束项,为Constraints对象
bias_constraints:施加在偏置上的约束项,为Constraints对象
输入
形如(batch_size, ..., input_dim)的nD张量,最常见的情况为(batch_size, input_dim)的2D张量。
输出
形如(batch_size, ..., units)的nD张量,最常见的情况为(batch_size, units)的2D张量。
示例

1.5.5.2 Flatten层

Flatten层用来将输入“压平”,即把多维的输入一维化,常用在从卷积层到全连接层的过渡。Flatten不影响batch的大小。
keras.layers.core.Flatten()
示例

1.5.5.3 dropout层

为输入数据施加Dropout。Dropout将在训练过程中每次更新参数时随机断开一定百分比(p)的输入神经元连接,Dropout层用于防止过拟合。
keras.layers.core.Dropout(p)

1.5.5.4 卷积层(Convolutional)

1.5.5.4.1 Conv1D层

keras.layers.convolutional.Conv1D(filters, kernel_size, strides=1, padding='valid', dilation_rate=1, activation=None, use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None)
一维卷积层(即时域卷积),用以在一维输入信号上进行邻域滤波。当使用该层作为首层时,需要提供关键字参数input_shape。例如(10,128)代表一个长为10的序列,序列中每个信号为128向量。而(None, 128)代表变长的128维向量序列。
该层生成将输入信号与卷积核按照单一的空域(或时域)方向进行卷积。如果use_bias=True,则还会加上一个偏置项,若activation不为None,则输出为经过激活函数的输出。
参数
filters:卷积核的数目(即输出的维度)
kernel_size:整数或由单个整数构成的list/tuple,卷积核的空域或时域窗长度
strides:整数或由单个整数构成的list/tuple,为卷积的步长。任何不为1的strides均与任何不为1的dilation_rate均不兼容
padding:补0策略,为“valid”, “same” 或“causal”,“causal”将产生因果(膨胀的)卷积,即output[t]不依赖于input[t+1:]。当对不能违反时间顺序的时序信号建模时有用。参考WaveNet: A Generative Model for Raw Audio, section 2.1.。“valid”代表只进行有效的卷积,即对边界数据不处理。“same”代表保留边界处的卷积结果,通常会导致输出shape与输入shape相同。
activation:激活函数,为预定义的激活函数名(参考激活函数),或逐元素(element-wise)的Theano函数。如果不指定该参数,将不会使用任何激活函数(即使用线性激活函数:a(x)=x)
dilation_rate:整数或由单个整数构成的list/tuple,指定dilated convolution中的膨胀比例。任何不为1的dilation_rate均与任何不为1的strides均不兼容。
use_bias:布尔值,是否使用偏置项
kernel_initializer:权值初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
bias_initializer:权值初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
kernel_regularizer:施加在权重上的正则项,为Regularizer对象
bias_regularizer:施加在偏置向量上的正则项,为Regularizer对象
activity_regularizer:施加在输出上的正则项,为Regularizer对象

kernel_constraints:施加在权重上的约束项,为Constraints对象
bias_constraints:施加在偏置上的约束项,为Constraints对象
输入shape
形如(samples,steps,input_dim)的3D张量。
输出shape
形如(samples,new_steps,nb_filter)的3D张量,因为有向量填充的原因,steps的值会改变。

1.5.5.4.2 Conv2D层

keras.layers.convolutional.Conv2D(filters, kernel_size, strides=(1, 1), padding='valid', data_format=None, dilation_rate=(1, 1), activation=None, use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None)
二维卷积层,即对图像的空域卷积。该层对二维输入进行滑动窗卷积,当使用该层作为第一层时,应提供input_shape参数。例如input_shape = (128,128,3)代表128*128的彩色RGB图像(data_format='channels_last')

1.5.5.4.3 Conv3D层

keras.layers.convolutional.Conv3D(filters, kernel_size, strides=(1, 1, 1), padding='valid', data_format=None, dilation_rate=(1, 1, 1), activation=None, use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None)
三维卷积对三维的输入进行滑动窗卷积,当使用该层作为第一层时,应提供input_shape参数。例如input_shape = (3,10,128,128)代表对10帧128*128的彩色RGB图像进行卷积。数据的通道位置仍然有data_format参数指定。

1.5.5.5池化层

1.5.5.5.1 MaxPooling1D层

keras.layers.pooling.MaxPooling1D(pool_size=2, strides=None, padding='valid')
对时域1D信号进行最大值池化
参数
pool_size:整数,池化窗口大小
strides:整数或None,下采样因子,例如设2将会使得输出shape为输入的一半,若为None则默认值为pool_size。
padding:‘valid’或者‘same’
输入shape
形如(samples,steps,features)的3D张量
输出shape
形如(samples,downsampled_steps,features)的3D张量

1.5.5.5.2 MaxPooling2D层

keras.layers.pooling.MaxPooling2D(pool_size=(2, 2), strides=None, padding='valid', data_format=None)
为空域信号施加最大值池化

1.5.5.5.3 AveragePooling1D层

keras.layers.pooling.AveragePooling1D(pool_size=2, strides=None, padding='valid')
对时域1D信号进行平均值池化

1.5.5.5.4 AveragePooling2D层

keras.layers.pooling.AveragePooling2D(pool_size=(2, 2), strides=None, padding='valid', data_format=None)
为空域信号施加平均值池化

1.5.5.6 循环层(Recurrent)

循环层包含三种模型:LSTM、GRU和SimpleRNN。
所有的循环层(LSTM,GRU,SimpleRNN)都继承本层,因此下面的参数可以在任何循环层中使用。

1.5.5.6.1抽象层,不能直接使用

keras.layers.recurrent.Recurrent(return_sequences=False, go_backwards=False, stateful=False, unroll=False, implementation=0)
weights:numpy array的list,用以初始化权重。该list形如[(input_dim, output_dim),(output_dim, output_dim),(output_dim,)]
 return_sequences:布尔值,默认False,控制返回类型。若为True则返回整个序列,否则仅返回输出序列的最后一个输出
 go_backwards:布尔值,默认为False,若为True,则逆向处理输入序列并返回逆序后的序列
 stateful:布尔值,默认为False,若为True,则一个batch中下标为i的样本的最终状态将会用作下一个batch同样下标的样本的初始状态。
 unroll:布尔值,默认为False,若为True,则循环层将被展开,否则就使用符号化的循环。当使用TensorFlow为后端时,循环网络本来就是展开的,因此该层不做任何事情。层展开会占用更多的内存,但会加速RNN的运算。层展开只适用于短序列。
 implementation:0,1或2, 若为0,则RNN将以更少但是更大的矩阵乘法实现,因此在CPU上运行更快,但消耗更多的内存。如果设为1,则RNN将以更多但更小的矩阵乘法实现,因此在CPU上运行更慢,在GPU上运行更快,并且消耗更少的内存。如果设为2(仅LSTM和GRU可以设为2),则RNN将把输入门、遗忘门和输出门合并为单个矩阵,以获得更加在GPU上更加高效的实现。注意,RNN dropout必须在所有门上共享,并导致正则效果性能微弱降低。
 input_dim:输入维度,当使用该层为模型首层时,应指定该值(或等价的指定input_shape)
 input_length:当输入序列的长度固定时,该参数为输入序列的长度。当需要在该层后连接Flatten层,然后又要连接Dense层时,需要指定该参数,否则全连接的输出无法计算出来。注意,如果循环层不是网络的第一层,你需要在网络的第一层中指定序列的长度(通过input_shape指定)。

1.5.5.6.2全连接RNN网络

keras.layers.SimpleRNN(units, activation='tanh', use_bias=True, kernel_initializer='glorot_uniform', recurrent_initializer='orthogonal', bias_initializer='zeros', kernel_regularizer=None, recurrent_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None, bias_constraint=None, dropout=0.0, recurrent_dropout=0.0, return_sequences=False, return_state=False, go_backwards=False, stateful=False, unroll=False)
全连接RNN网络,RNN的输出会被回馈到输入
参数说明
• units:输出维度
• activation:激活函数,为预定义的激活函数名(参考激活函数)
• use_bias: 布尔值,是否使用偏置项
• kernel_initializer:权值初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
• recurrent_initializer:循环核的初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
• bias_initializer:权值初始化方法,为预定义初始化方法名的字符串,或用于初始化权重的初始化器。参考initializers
• kernel_regularizer:施加在权重上的正则项,为Regularizer对象
• bias_regularizer:施加在偏置向量上的正则项,为Regularizer对象
• recurrent_regularizer:施加在循环核上的正则项,为Regularizer对象
• activity_regularizer:施加在输出上的正则项,为Regularizer对象
• kernel_constraints:施加在权重上的约束项,为Constraints对象
• recurrent_constraints:施加在循环核上的约束项,为Constraints对象
• bias_constraints:施加在偏置上的约束项,为Constraints对象
• dropout:0~1之间的浮点数,控制输入线性变换的神经元断开比例
• recurrent_dropout:0~1之间的浮点数,控制循环状态的线性变换的神经元断开比例
• 其他参数参考Recurrent的说明

输入shape
形如(samples,timesteps,input_dim)的3D张量
输出shape
如果return_sequences=True:返回形如(samples,timesteps,output_dim)的3D张量
否则,返回形如(samples,output_dim)的2D张量
示例:

1.5.5.6.3 LSTM层

keras.layers.recurrent.LSTM(units, activation='tanh', recurrent_activation='hard_sigmoid', use_bias=True, kernel_initializer='glorot_uniform', recurrent_initializer='orthogonal', bias_initializer='zeros', unit_forget_bias=True, kernel_regularizer=None, recurrent_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None, bias_constraint=None, dropout=0.0, recurrent_dropout=0.0)
Keras长短期记忆模型
forget_bias_init:遗忘门偏置的初始化函数,建议初始化为全1元素。
inner_activation:内部单元激活函数

1.5.5.6.4 GRU

keras.layers.recurrent.GRU(units, activation='tanh', recurrent_activation='hard_sigmoid', use_bias=True, kernel_initializer='glorot_uniform', recurrent_initializer='orthogonal', bias_initializer='zeros', kernel_regularizer=None, recurrent_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None, bias_constraint=None, dropout=0.0, recurrent_dropout=0.0)
门限循环单元

1.5.5.6.5Embedding层

keras.layers.embeddings.Embedding(input_dim, output_dim, init='uniform', input_length=None, W_regularizer=None, activity_regularizer=None, W_constraint=None, mask_zero=False, weights=None, dropout=0.0)
只能作为模型第一层
mask_zero:布尔值,确定是否将输入中的‘0’看作是应该被忽略的‘填充’(padding)值,该参数在使用递归层处理变长输入时有用。设置为True的话,模型中后续的层必须都支持masking,否则会抛出异常

1.5.6 model层

model层是最主要的模块,model层可以将上面定义了各种基本组件组合起来。
model的方法:
model.summary() : 打印出模型概况
model.get_config() :返回包含模型配置信息的Python字典
model.get_weights():返回模型权重张量的列表,类型为numpy array
model.set_weights():从numpy array里将权重载入给模型
model.to_json:返回代表模型的JSON字符串,仅包含网络结构,不包含权值。可以从JSON字符串中重构原模型:

model.save_weights(filepath):将模型权重保存到指定路径,文件类型是HDF5(后缀是.h5)。
model.load_weights(filepath, by_name=False):从HDF5文件中加载权重到当前模型中, 默认情况下模型的结构将保持不变。如果想将权重载入不同的模型(有些层相同)中,则设置by_name=True,只有名字匹配的层才会载入权重。
keras有两种model,分别是Sequential模型和泛型模型。

1.5.6.1 Sequential模型

Sequential是多个网络层的线性堆叠
可以通过向Sequential模型传递一个layer的list来构造该模型:

也可以通过.add()方法一个个的将layer加入模型中:

还可以通过merge将两个Sequential模型通过某种方式合并
Merge层提供了一系列用于融合两个层或两个张量的层对象和方法。以大写首字母开头的是Layer类,以小写字母开头的是张量的函数。小写字母开头的张量函数在内部实际上是调用了大写字母开头的层。
keras.engine.topology.Merge(layers=None, mode='sum', concat_axis=-1, dot_axes=-1, output_shape=None, node_indices=None, tensor_indices=None, name=None)
layers:该参数为Keras张量的列表,或Keras层对象的列表。该列表的元素数目必须大于1。
mode:合并模式,如果为字符串,则为下列值之一{“sum”,“mul”,“concat”,“ave”,“cos”,“dot”}
其中sum和mul是对待合并层输出做一个简单的求和、乘积运算,因此要求待合并层输出shape要一致。concat是将待合并层输出沿着最后一个维度进行拼接,因此要求待合并层输出只有最后一个维度不同。
Merge是一个层对象,在多个sequential组成的网络模型中,如果
x:输入数据。如果模型只有一个输入,那么x的类型是numpy array,如果模型有多个输入,那么x的类型应当为list,list的元素是对应于各个输入的numpy array
y:标签,numpy array
否则运行时很可能会提示意思就是你输入的维度与实际不符
 Add
keras.layers.Add()
添加输入列表的图层。
该层接收一个相同shape列表张量,并返回它们的和,shape不变。

 Concatenate
keras.layers.Concatenate(axis=-1)
该层接收一个列表的同shape张量,并返回它们的按照给定轴相接构成的向量。

1.5.6.2 函数式(Functional)模型

在Keras 2里我们将这个词改译为“函数式”,对函数式编程有所了解的同学应能够快速get到该类模型想要表达的含义。函数式模型称作Functional,但它的类名是Model,因此我们有时候也用Model来代表函数式模型。
Keras函数式模型接口是用户定义多输出模型、非循环有向模型或具有共享层的模型等复杂模型的途径。一句话,只要你的模型不是类似VGG一样一条路走到黑的模型,或者你的模型需要多于一个的输出,那么你总应该选择函数式模型。函数式模型是最广泛的一类模型,序贯模型(Sequential)只是它的一种特殊情况。
这部分的文档假设你已经对Sequential模型已经比较熟悉
让我们从简单一点的模型开始
第一个模型:全连接网络
Sequential当然是实现全连接网络的最好方式,但我们从简单的全连接网络开始,有助于我们学习这部分的内容。在开始前,有几个概念需要澄清:
层对象接受张量为参数,返回一个张量。
输入是张量,输出也是张量的一个框架就是一个模型,通过Model定义。
这样的模型可以被像Keras的Sequential一样被训练

所有的模型都是可调用的,就像层一样
利用函数式模型的接口,我们可以很容易的重用已经训练好的模型:你可以把模型当作一个层一样,通过提供一个tensor来调用它。注意当你调用一个模型时,你不仅仅重用了它的结构,也重用了它的权重。

使用函数式模型的一个典型场景是搭建多输入、多输出的模型,如下图:

第2章 keras的使用流程

2.1 流程说明

第1步:构造数据:定义输入数据
第2步:构造模型:确定各个变量之间的计算关系
第3步:编译模型:编译已确定其内部细节
第4步:训练模型:导入数据,训练模型
第5步:测试模型
第6步:保存模型
把这些步骤进一步图形化为:

2.2 实例-详细说明使用流程

第1步:构造数据
我们需要根据模型fit(训练)时需要的数据格式来构造数据的shape,这里我们用numpy构造两个矩阵:一个是数据矩阵,一个是标签矩阵。

通过numpy的random生成随机矩阵,数据矩阵是1000行784列的矩阵,标签矩阵是1000行1列的句子,所以数据矩阵的一行就是一个样本,这个样本是784维的。
第2步 构造模型
我们来构造一个神经网络模型,keras构造深度学习模型可以采用序列模型(基于Sequential类)或函数模型(又称为通用模型)(基于Model类)。两种间差异是拓扑结构不一样。这里我们采用序列模型。

在这一步中可以add多个层,也可以merge合并两个模型。
第3步:编译模型
我们编译上一步构造好的模型,并指定一些模型的参数,optimizer(优化器),loss(目标函数或损失函数),metrics(评估模型的指标)等。编译模型时损失函数和优化器这两项是必须的。

第4步:训练模型
传入要训练的数据和标签,并指定训练的一些参数,然后进行模型训练。

Epoch 1/10
- 1s - loss: 0.7063 - acc: 0.5010
Epoch 2/10
- 0s - loss: 0.6955 - acc: 0.5110
.................................
Epoch 10/10
- 0s - loss: 0.5973 - acc: 0.6980

epochs:整数,训练的轮数。
verbose:训练时显示实时信息,0表示不显示数据,1表示显示进度条,2表示用只显示一个数据。
batch_size:整数,指定进行梯度下降时每个batch包含的样本数。训练时一个batch的样本会被计算一次梯度下降,使目标函数优化一步。
第5步:测试模型
用测试数据测试已经训练好的模型,并可以获得测试结果,从而对模型进行评估

200/200 [==============================] - 0s 146us/step
本函数返回一个测试误差的标量值(如果模型没有其他评价指标),或一个标量的list(如果模型还有其他的评价指标)
第6步:保存模型

【项目延伸】
上面是采用全连接的神经网络,包括输入层、一个隐含层及一个输出层。如果我们卷积神经网络是否可以?例如:一个卷积层+池化层+展平+全连接+输出层。

第3章 keras实现单层神经网络

3.1利用keras实现单层神经

本章利用Keras架构实现一个传统机器学习算法---线性回归
根据输入数据及目标数据,模拟一个线性函数y=kx+b
这里使用一个神经元,神经元中使用Relu作为激活函数。如下图:

第1步:构造数据

把200份数据划分为训练数据、测试数据。

第2步 构造模型

第3步 编译模型

第4步 训练模型

第5步 测试模型

Testing ------------
40/40 [==============================] - 0s 996us/step
test cost: 0.00395184289664
Weights= [[ 0.48489931]]
biases= [ 1.95838749]

可视化结果:

第4章 keras实现多层神经网络

利用keras构造一个多层神经网络,用该神经网络识别手写数字,上次我们采用python来实现,这里我们采用keras来构造多层神经网络。
网络构造图形:

在整个网络设计中,输入数据的维度,优化方法、损失函数需要重点考虑。当然激活函数也很重要,特别是层数较多时。
这里为便于说明keras构建多层神经网络的方法,采用MNIST数据集,MNIST是一个手写数字0-9的数据集,它有60000个训练样本集和10000个测试样本集它是NIST数据库的一个子集。该数据集keras有现成的数据处理API(mnist.load_data())。
数据预处理:
(1)展平矩阵:
原数据为28*28图片,在利用全连接前,需要把矩阵拉平为一维数组,大小为784;
(2)规范训练数据
转换为都是0-255的像素,为提高模型的泛化能力,需要对数据规范化,即除以255,使数据范围都在[0,1]之间;
(3)规范标签数据
把标签数据转换为one-hot格式,向量维度为10,每行除一个1元素外,其它都是0,如把2标签转换为[0,0,1,0,0,0,0,0,0,0]
以下为详细计算步骤:
第1步: 构建数据

运行结果:
Using TensorFlow backend.
展平前
(60000, 28, 28) (60000,)
展平后
(60000, 784) (60000, 10)

第2步 构建网络

第3步 编译模型

运行结果:
【备注】
如果我们需要对优化方法进行某些定制化,也很方便:

第4步 训练模型

运行结果:
Training ------------
Epoch 1/2
- 7s - loss: 0.1584 - acc: 0.9539
Epoch 2/2
- 6s - loss: 0.1340 - acc: 0.9607
第5步 测试模型

运行结果:
test loss: 0.132445694927
test accuracy: 0.9614


本章数据集下载

第10章 构建Spark ML聚类模型

前面我们介绍了推荐、分类、回归等模型,这些模型属于监督学习,在训练模型时,都提供目标值或标签数据,根据目标值训练模型,然后根据模型对测试数据或新数据进行推荐、分类或预测。
但实际数据有很多是没有标签数据,或者预先标签很难,但我们又希望或需要从这些数据中提炼一些规则或特征等,如识别异常数据、对客户进行分类等,解决这类问题就属于无监督学习。
聚类是一种无监督学习,它与分类的不同,聚类所要求划分的类是未知的。
聚类算法的思想就是物以类聚的思想,相同性质的点在空间中表现的较为紧密和接近,主要用于数据探索与异常检测。
聚类分析是一种探索性的分析,在分类的过程中,人们不必事先给出一个分类的标准,它能够从样本数据出发,自动进行分类。聚类分析也有很多方法,使用不同方法往往会得到不同的结论。从实际应用的角度看,聚类分析是数据挖掘的主要任务之一。而且聚类能够作为一个独立的工具获得数据的分布状况,观察每一簇数据的特征,集中对特定的聚簇集合作进一步地分析。聚类分析还可以作为其他算法(如分类和推荐等算法)的预处理步骤。

10.1 K-means模型简介

作为经典的聚类算法,一般的机器学习框架里都有K-means,Spark自然也不例外。
不过spark中的K-means,除有一般K-means的特点外,还进行了如下的优化:

10.2 数据加载

这里我们以某批发经销商的客户对不同产品的年度消费支出(数据来源http://archive.ics.uci.edu/ml/datasets/Wholesale+customers)
读取HDFS中的数据。

10.3 探索特征的相关性

从以上分析,我们可以看出,rawdata数据集总记录数为440条,最大与最小值相差不大,已统计的特征来看,没有缺失值,数据类型为字符型,这点需要在预处理中转换为Double型。
利用pyspark我们可以画出这些特征间的相关性,这里使用pearson's r,相关系统在[-1,1]之间,如果r=1,表示特征完全正相关;r=0,表示不存在关系;r=-1,表示特征完全负相关。
实现代码:

10.4 数据预处理

通过数据探索,发现数据需要又字符转换为数值型,并缓存。

查看数据的统计信息:

Channel、Region为类别型,其余6个字段为连续型,为此,在训练模型前,需要对类别特征先转换为二元向量,然后,对各特征进行规范化。最后得到一个新的特征向量。
对类别特征转换为二元编码:

把新生成的两个特征及原来的6个特征组成一个特征向量

把源数据组合成特征向量features

10.5 组装

这里我们只使用了setK、setSeed两个参数,其余的使用缺省值。

10.6 模型优化

聚类模型中最重要的是参数k的选择,下面我们通过循环来获取哪个k值的性能最好。

以上数据可视化的图形如图10-2所示。

图10-2聚类模型中族K与评估指标的关系

从图10-2中不难看出,k<12时,性能(computeCost)提升比较明显,>12后,逐渐变缓。所以K越大不一定越好,恰当才是重要的。
当k=10时,聚类结果如下:
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 65|
| 6| 18|
| 3| 86|
| 5| 27|
| 9| 3|
| 4| 2|
| 8| 2|
| 7| 46|
| 2| 27|
| 0| 164|
+----------+-----+
图10-3为k取10时(族0对应的channel和Region分别为1和3;族3对应的channel和Region分别为2和3),前两大族的销售均值比较图,从图中可以看出,团购冷藏食品均值大于或接近零售冷藏食品均值。说明团购对冷藏食品量比较大。

图10-3 聚类模型中K=10时0和3族平均销售额对比

10.7 小结

本章主要介绍了用Spark ML中的聚类算法,对某地多种销售数据进行聚类分析,在分析前对数据集主要特征进行了相关性分析,并对类别数据进行二元向量化,对连续性数据进行规范和标准化,然后把这些stages组装在流水线上,在模型训练中,我们尝试不同K的取值,以便获取最佳族群数。


本章数据集下载

第9章 构建Spark ML回归模型

回归模型属于监督式学习,每个个体都有一个与之相关联的实数标签,并且我们希望在给出用于表示这些实体的数值特征后,所预测出的标签值可以尽可能接近实际值。
回归算法是试图采用对误差的衡量来探索变量之间的关系的一类算法。回归算法是统计机器学习的利器。在机器学习领域,人们说起回归,有时候是指一类问题,有时候是指一类算法,这一点常常会使初学者有所困惑。常见的回归算法包括:普通最小二乘法(OLS)(Ordinary Least Square),它使用损失函数是平方损失函数(1/2 (w^T x-y)^2),简单的预测就是y=w^T x,标准的最小二乘回归不使用正则化,这就意味着数据中异常数据点非常敏感,因此,在实际应用中经常使用一定程度的正则化(目的避免过拟合、提供泛化能力)。
本章主要介绍Spark ML中的回归模型,以回归分析中常用决策树回归、线性回归为例,对共享单车租赁的情况进行预测,其中介绍了一些特征转换、特征选择、交叉验证等方法的具体使用,主要内容包括:
回归模型简介
把数据加载到HDFS,Spark读取HDFS中的数据
探索特征及其分布信息
预处理数据
把pipeline的多个Stages组装到流水线上
模型优化

9.1 回归模型简介

ML目前支持回归模型有:
Linear regression (线性回归)
Generalized linear regression(广义线性回归)
Decision tree regression (决策树的回归)
Random forest regression(随机森林回归 )
Gradient-boosted tree regression (梯度提高树回归)
Survival regression(生存回归)
Isotonic regression(保序回归)

9.2 数据加载

查看数据大致情况:

从数据集前3行的数据可以看出,第一行为标题,其他为租赁数据,共有17个字段和17380条记录。
把数据文件hour.csv复制到HDFS上。

以独立模式启动spark,然后读取数据。

导入需要使用的类。

读取数据,把第一行为列名:

查看前4行样本数据

9.3 探索特征分布

Spark读取数据后,我们就可以对数据进行探索和分析,首先查看前4行样本数据

查看rawdata的数据结构

目前这些数据的字段都是字符型,后续需要转换为数值型。
查看主要字段的统计信息

其中有很多字段是类型型,如果使用回归算法时,需要通过OneHotEncoder把数据转换为二元向量,对一些字段或特征进行规范化。
通过pyspark可以画出主要特征的重要程度:

图9-2 各特征的重要性

通过pyspark可以画出其中一些特征的分布情况:

图9-3特征间的关系图

9.4 数据预处理

9.4.1 特征选择

首先把字符型的特征转换为数值类型,并过滤instant、dteday、casual、registered等4个无关或冗余特征。cnt特征作为标志。

生成一个存放以上预测特征的特征向量

把源数据组合成特征向量features

9.4.2 特征转换

使用决策树回归算法前,我们对类别特征进行索引化或数值化。

因OneHotEncoder不是Estimator,这里我们对采用回归算法的数据另外进行处理,先建立一个流水线,把以上转换组装到这个流水线上。

把原来的4个及转换后的8个二元特征向量,拼接成一个feature向量。

9.5 组装

1)将data1数据分为训练和测试集(30%进行测试,种子设为12):

2)设置决策树回归模型参数

3)设置线性回归模型的参数

4)把决策树回归模型涉及的特征转换及模型训练组装在一个流水线上。

5)把线性回归模型涉及的特征转换、模型训练组装载一个流水上线。

6)训练模型

7)作出预测

8)评估模型

从以上使用不同模型情况看来,决策树性能稍好与线性回归,但这仅是粗糙的比较,下面使用模型选择中介绍的一些方法,对线性模型进行优化。

9.6 模型优化

从图9-3可知,temp特征与atemp特征线性相关,而且从图9-2可知,atemp的贡献度较小,所以我们将过滤该特征。

对label标签特征进行转换,使其更接近正态分布,这里我们SQLTransformer转换器,其具体使用可参考第4章。

这里我们利用训练验证划分法对线性回归模型进行优化,对参数进行网格化,将数据集划分为训练集、验证集和测试集。
1)导入需要用到的包。

2)建立模型,预测label1的值,设置线性回归参数。

3)设置流水线,为便于把特征组合、特征值优化、模型训练等任务组装到这条流水线上。

4)建立参数网格。

5)选择(prediction, label1),计算测试误差。

6)训练模型并自动选择最优参数。

7)查看模型全部参数

8)用最好的参数组合,做出预测。

看了对标签特征进行转换、利用网格参数及训练验证划分等优化方法,从102下降到3左右,效果比较明显。

9.7 小结

本章主要介绍Spark ML的线性回归模型、决策树回归模型,对共享单车的租赁信息进行预测,由于很多数据不规范,因此,对原数据进行了二元向量转换、对类别数据索引化,然后把这些转换组装到流水线上,在训练集上训练模型,在测试集上进行预测,最后,更加评估指标对模型进行优化。

第8章 构建Spark ML 分类模型

在上一章中,我们通过实例介绍了Spark中基于协同过滤的推荐模型,了解了推荐模型的原理以及场景、使用流水线组装任务,使用自定义函数优化模型等。这一章我们将就Spark中分类模型为例,进一步说明如何使用Spark ML中特征选取、特征转换、流水线、模型选择或优化等方法,简化、规范化、流程化整个机器学习过程。
分类、回归和聚类是机器学习中重要的几个分支,也是日常数据处理与分析中最常用的手段。这几类的算法有着较高的成熟度,原理也较容易理解,且有着不错的效果,深受数据分析师们的喜爱。本章以Spark ML分类模型为例,主要包括以下内容:
 简介用于分类的几种常用算法
 加载数据
 探索加载后的数据
 预处理数据
 把各种任务组装到流水线上
 模型调优

8.1分类模型简介

8.1.1线性模型

8.1.2 决策树模型

决策树模型是一个强大的非概率模型,可以用来表示复杂的非线性模式和特征的相互关系。

8.1.3 朴素贝叶斯模型

关于朴素贝叶斯详细的原理,在维基百科中有更为详细的数学公式解释:http://en.wikipedia.org/wiki/Naive_Bayes_classifier。

8.2数据加载

存放路径在 /home/hadoop/data/train.tsv。
数据集下载
先使用shell命令对数据进行试探性的查看,并做一些简单的数据处理。
1) 查看前2行数据

数据集中的第1行为标题(字段名)行,下面是一些的字段说明。
2) 查看文件记录总数

结果显示共有:数据集一共有7396条数据
3) 由于textFile目前不好过滤标题行数据,为便于spark操作数据,需要先删除标题。

4) 将数据文件上传到 hdfs

5) 查看是否上成功

6) 启动Spark Shell

7) 通过sc对象的textFile方法,由本地文件数据创建RDD

8.3数据探索

1) 查看数据前2行

由上面可以看到,得到的是只有一行字符串数组。通过常看源文件,我们可以发现字段间由制表符(\t)分割。由于后续的算法我们不需要时间戳以及网页的内容,所以这里先将其过滤掉。下面我们获取每个属性。
2) 根据以上分析,对数据进行处理,并生成新的RDD

3) 查看数据结构

4) 查看总的数据行数

5) 查看每一行数据的列数

6) 获取第一行的某个值

8.4数据预处理

1) 导入LabeledPoint

2) 导入Vectors矢量方法

3) 对数据进行1-4步的数据清洗工作

上述代码可通过复制粘贴到代码行中,使用 :paste ,粘贴过后按下 Ctrl+D 即可。
4) 考虑到使用朴素贝叶斯算法时,数据需不小于0,故需要做些处理。

5) 查看清理后数据集的前2行数据

6) 通过RDD创建DataFrame

7) 查看df和nbDF的数据

8) 查看df和nbDF的Schema的信息和数据总行数

9) 随机地将数据进行划分,80%用于训练集,20%用于测试集

10) 查看训练数据和测试数据的总行数

11) 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。

12) 导入逻辑回归分类器、决策树模型以及朴素贝叶斯模型

13) 创建贝叶斯模型,设置初始参数

14) 通过朴素贝叶斯训练模型,对测试数据进行预测

15) 朴素贝叶斯准确性统计

可以看到,朴素贝叶斯的准确率为56.6419%。

8.5组装

1) 导入特征索引类

2) 建立特征索引

3) 创建逻辑回归模型

4) 创建决策树模型

5) 导入网格参数和交叉验证

6) 导入流水线

7) 导入评估器

8) 配置2个流水线:一个是逻辑回归的流水线,包含2个stages( featureIndexer和lr);
一个是决策树回归的流水线,包含2个stages( featureIndexer 和 dt)。

8.6模型优化

1) 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid

2) 分别实例化交叉验证模型

3) 通过交叉验证模型,获取最优参数集,并测试模型

4) 查看数据

5) 查看逻辑回归匹配模型的参数

6) 查看决策树匹配模型的参数

7) 统计逻辑回归的预测正确率

可以看到,我们通过交叉验证得出最优参数,从而获得最佳模型,将这个过程使用流水线连接起来,方便了我们的工作。关于模型的优化,其实我们还有很多工作要做,第11章也也出了一定的优化思路和方法。

8.7小结

本章就Spark ML中分类模型进行的详细介绍,包括逻辑回归、决策树、朴素贝叶斯模型的原理,同时介绍了分类模型的一些使用场景。通过流水线、网格参数以及交叉验证的方式,将整个机器学习过程规范化、标准化、流程化。


本章数据集下载

第7章 构建Spark ML推荐模型

前面我们介绍了机器学习的一般步骤、如何探索数据、如何预处理数据、如何利用Spark Ml中的一些算法或API,以及有效处理机器学习过程中的特征转换、特征选择、训练模型,并把这些过程流程化等。从本章开始,我们将通过实例,进一步阐述这些问题,并通过实例把相关内容有机结合起来。
本章主要介绍Spark机器学习中的协同过滤(Collaborative Filtering,CF)模型,协调过滤简单来,说是利用某个兴趣相投、拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的,进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。在日常生活中,人们实际上经常使用这种方法,如你哪天突然想看个电影,但你不知道具体看哪部,你会怎么做?大部分的人会问问周围的朋友,最近有什么好看的电影,而我们一般更倾向于从兴趣或观点相近的朋友那里得到推荐。这就是协同过滤的思想。换句话说,就是借鉴和你相关人群的观点来进行推荐。
本章介绍Spark的推荐模型,将按以下步骤进行:
 首先简介推荐模型
 加载数据到HDFS
 Spark读取数据
 对数据进行探索
 训练模型
 组装任务
 评估、优化模型

7.1推荐模型简介

协同过滤常被用于推荐系统。这类技术目标在于填充“用户-商品”联系矩阵中的缺失项。Spark.ml目前支持基于模型的协同过滤,其中用户和商品以少量的潜在因子来描述,用以预测缺失项。Spark.ml使用交替最小二乘(ALS)算法来学习这些潜在因子。

7.2数据加载

这里使用MovieLens 100k数据集,主要包括用户属性数据(u.user)、电影数据(u.item)、用户对电影的评级数据(u.data)及题材数据(u.genre)等。在把数据复制到HDFS之前,我们先大致了解一下相关数据:
用户数据(u.user)结构:

可以看出用户数据由user id、age、gender、occupation和zip code等5个字段,字段间隔符为竖线("|"),共有943行。
电影数据(u.item)结构:

可以看出用户数据由movie id、title、release date及其他属性,字段间隔符为竖线("|"),共有1682行。
用户对电影评级数据(u.data)结构:

可以看出用户数据由user id、movie id、rating(1-5)和timestamp等4个字段,字段间隔符为制表符("\t"),共有100000行。
电影题材数据(u.genre):

这个数据只有两个字段:题材及代码,以竖线分隔。共有20种电影题材。
把用户数据(u.user)复制到HDFS上,其他数据方法一样。

查看数据复制是否成功

把相关数据复制到HDFS后,我们就可以利用Pyspark对数据进行探索或简单分析,这里使用Pyspark主要考虑其可视化功能,如果不需要数据的可视化,使用Spark即可。
以spark Standalone模式启动spark集群

导入需要的包或库

7.3数据探索

数据加载到HDFS后,我们便可对数据进行探索和分析,对用户数据的探索,大家可参考2.4.3节的相关内容。用户对电影评级数据比较简单,这里我们简单查看一下导入数据抽样及统计信息。 抽样数据:

用户ID、电影ID、评级数据统计信息:

由此可知,该数据集共有100000条,评级最低为1.0,最高为5.0,平均3.5左右。

7.4训练模型

这里数据比较简单,无须做数据转换和清理等数据预处理工作。在训练模型前,我们需要把数据划分为几个部分,这里先随机划分成两部分,划分比例为80%作为训练集,20%作为测试集。后续我们在性能优化时将采用另一种划分方式,然后,比较使用不同划分方法对模型性能或泛化能力的影响。

7.5组装

1)创建流水线,把数据转换、模型训练等任务组装在一条流水线上。

2)训练模型

3)作出预测

4)查看预测值与原来的值

7.6评估模型

1)预测时会产生NaN,即NaN表示不推荐(预测时产生NaN是spark2.1 ALS中的一个bug,该bug在2.2中将修复)

2)删除含NaN的值的行,NaN有一定合理性,不推荐,但为评估指标,可以先过滤这些数。

3)运行结果为:rmse: Double = 1.016902715345917

7.7模型优化

//最佳模型相关参数
The best model was trained with rank = 20 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.9383240505365207.

7.8小结

本章介绍了推荐模型的一般方法,Spark推荐模型的原理和算法等,然后通过一个实例具体说明实施Spark推荐模型的一般步骤、使用自定义函数优化模型等内容。下一章将以Spark ML的分类模型为例,进一步说明如何使用Spark ML提供的特征选取、特征转换、流水线、交叉验证等函数或方法。


本章数据集下载

第6章Spark MLlib基础

传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用。一旦数据量过大,往往需要采用数据抽样的方法。但这种抽样很难保证不走样。近些年随着 HDFS 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习变得可能或必要,但由于MapReduce计算框架,虽然实现分布式计算,但中间结果需要存在到磁盘,这对这计算过程中需要多次迭代的机器学习,因为通常情况下机器学习算法参数学习的过程都是迭代计算的,不很理想。
Spark的出现,正好弥补了MapReduce的不足,它立足于内存计算,所以特别适合机器学习的迭代式计算。同时Spark提供了一个基于海量数据的分布式运算的机器学习库,同时提供了很多特征选取、特征转换等内嵌函数,大大降低了大家学习和使用Spark的门槛,对很多开发者只需对 Spark 有一定基础、了解机器学习算法的基本原理、以及相关参数的含义和作用,一般都可以通过都可以比较顺利地使用Spark进行基于大数据的机器学习。
Spark在机器学习方面有很多优势,本章主要Spark与机器学习相关的内容。
 Spark MLlib简介
 Spark MLlib架构
 常用的几种数据类型
 基础统计
 RDD、DataFrame及Dataset间的异同
 Spark MLlib常用算法

6.1Spark MLlib简介

MLlib是MLBase一部分,其中MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。它们的结构如下图:

图6-2 MLBase四部分关系

6.2Spark MLlib架构

6.3数据类型

Spark MLlib的数据类型主要分为四种,下面将分别介绍。
1. 本地向量(Local vector)
其创建方式主要有以下几种:(以下使用Scala语言)

2. 标记点(Labeled point)
标记点是由一个本地向量(密集或稀疏)和一个标签(整数或浮点)组成,这个值的具体内容可以由用户指定。

从文件中直接获取标记点:

3. 本地矩阵(Local matrix)
由行索引、列索引、类型值组成,存放在单机中。

4. 分布式矩阵(Distributed matrix)

6.4 基础统计

6.4.1摘要统计

示例代码如下:

6.4.2相关性

目前Spark支持两种相关性(correlations)系数:皮尔森相关系数(pearson)和斯皮尔曼等级相关系数(spearman)。下面通过示例说明相关系统的如何计算。

6.4.3假设检验

假设检验(Hypothesis testing),Spark MLlib目前支持皮尔森卡方检测(Pearson’s chi-squared tests),包括适配度检测和独立性检测。适配度检测要求输入为Vector, 独立性检验要求输入是Matrix。
代码示例:

6.4.4随机数据生成

代码示例:

6.5 RDD、Dataframe和Dataset

目前,spark.mllib包中基于RDD的APIs已进入维护模式,以后将以spark.ml包中的基于DataFrame的API为主。

6.5.1RDD

RDD是Spark建立之初的核心API。它是一种有容错机制的特殊集合, RDD是不可变分布式弹性数据集,在Spark集群中可跨节点分区,以函数式编程操作集合的方式,进行各种并行操作,提供分布式low-level API来操作,包括transformation和action等。

6.5.2Dataset/DataFrame

DataFrame与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame多了数据的结构信息,即schema,类似于传统数据库中的表。

6.5.3相互转换

RDD、DataFrame和Dataset间可以互相转换。

6.6小结

本章主要介绍了Spark MLlib的一些内容,包括MLlib的生态、架构等内容,同时介绍了Spark MLlib算法底层依赖的基础内容,如数据类型、基础统计等,最后简单介绍了RDD、DataFrame与Dataset间的异同等。后续章节我们将通过一些实例,说明如何把前几章介绍的一些方法应用到具体实例中。