作者归档:feiguyun


本章数据集下载

第4章 特征提取、转换和选择

在实际机器学习项目中,我们获取的数据往往是不规范、不一致、有很多缺失数据,甚至不少错误数据,这些数据有时又称为脏数据或噪音,在模型训练前,务必对这些脏数据进行处理,否则,再好的模型,也只能脏数据进,脏数据出。
这章我们主要介绍对数据处理涉及的一些操作,主要包括:
特征提取
特征转换
特征选择

4.1 特征提取

特征提取一般指从原始数据中抽取特征。

4.1.1 词频-逆向文件频率(TF-IDF)

词频-逆向文件频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。
在下面的代码段中,我们以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能。

4.1.2 Word2Vec

Word2vec是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。 在下面的代码段中,我们首先用一组文档,其中每一个文档代表一个词语序列。对于每一个文档,我们将其转换为一个特征向量。此特征向量可以被传递到一个学习算法。

4.1.3 计数向量器

计数向量器(Countvectorizer)和计数向量器模型(Countvectorizermodel)旨在通过计数来将一个文档转换为向量。
以下用实例来说明计数向量器的使用。
假设有以下列id和texts构成的DataFrame:

每行text都是Array [String]类型的文档。调用fit,CountVectorizer产生CountVectorizerModel含词汇(a,b,c)。转换后的输出列“向量”包含:
调用的CountVectorizer产生词汇(a,b,c)的CountVectorizerModel,转换后的输出向量如下:

每个向量代表文档的词汇表中每个词语出现的次数。

4.2 特征转换

在机器学习中,数据处理是一件比较繁琐的事情,需要对原有特征做多种处理,如类型转换、标准化特征、新增衍生特征等等,需要耗费大量的时间和精力编写处理程序,不过,自从Spark推出ML后,情况大有改观,Spark ML包中提供了很多现成转换器,例如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。

4.2.1分词器

分词器(Tokenization)将文本划分为独立个体(通常为单词)。

4.2.2 移除停用词

停用词为在文档中频繁出现,但未承载太多意义的词语,它们不应该被包含在算法输入中,所以会用到移除停用词(StopWordsRemover)。
示例:
假设我们有如下DataFrame,有id和raw两列

通过对raw列调用StopWordsRemover,我们可以得到筛选出的结果列如下

其中,“I”, “the”, “had”以及“a”被移除。
实现以上功能的详细代码:

4.2.3 n-gram

一个n-gram是一个长度为整数n的字序列。NGram可以用来将输入转换为n-gram。

4.2.4 二值化

二值化,通过设置阀值,将连续型的特征转化为两个值。大于阀值为1,否则为0。
注:以下规范化操作一般是针对一个特征向量(dataFrame中的一个colum)来操作的。

4.2.5 主成分分析

主成分分析被广泛应用在各种统计学、机器学习问题中,是最常见的降维方法之一。
PCA在Spark2.0用法比较简单,只需要设置:
.setInputCol(“features”)//保证输入是特征值向量
.setOutputCol(“pcaFeatures”)//输出
.setK(3)//主成分个数
注意:PCA前一定要对特征向量进行规范化(标准化)!!!

4.2.6 多项式展开

多项式展开(PolynomialExpansion)即通过产生n维组合将原始特征将特征扩展到多项式空间。下面的示例会介绍如何将你的特征集拓展到3维多项式空间。

4.2.7 离散余弦变换

离散余弦变换(DCT)是与傅里叶变换相关的一种变换,它类似于离散傅立叶变换,但是只使用实数。

4.2.8 字符串-索引变换

字符串—索引变换(StringIndexer)是将字符串列编码为标签索引列。示例数据为一个含有id和category两列的DataFrame
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c

category是有3种取值的字符串列(a、b、c),使用StringIndexer进行转换后我们可以得到如下输出,其中category作为输入列,categoryIndex作为输出列:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
a获得索引0,因为它是最频繁的,随后是具有索引1的c和具有索引2的b。
如果测试数据集中比训练数据集多了一个d类:
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
如果您没有设置StringIndexer如何处理未看见的标签(默认值)或将其设置为“错误”,则会抛出异常。 但是,如果您调用了setHandleInvalid(“skip”),d类将不出现,结果为以下数据集:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
以下是使用StringIndexer的一个示例:

4.2.9 索引-字符串变换

与StringIndexer对应,索引—字符串变换(IndexToString)是将指标标签映射回原始字符串标签。
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
应用IndexToString,将categoryIndex作为输入列,将originalCategory作为输出列,我们可以检索我们的原始标签(它们将从列的元数据中推断):
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
以下是以上整个过程的一个实例:

4.2.10 独热编码

独热编码(OneHotEncoder)将标签指标映射为二值向量,其中最多一个单值。

【说明】
1、OneHotEncoder缺省状态下将删除最后一个分类或把最后一个分类作为0.
//示例

显示结果如下:
+----+---+-----+
| x| c|c_idx|
+----+---+-----+
| 1.0| a| 0.0|
| 1.5| a| 0.0|
|10.0| b| 2.0|
| 3.2| c| 1.0|
| 3.8| c| 1.0|
+----+---+-----+
最后一个分类为b,通过OneHotEncoder变为向量后,已被删除。

显示结果如下:
+----+---+-----+-------------+
| x| c|c_idx| c_idx_vec|
+----+---+-----+-------------+
| 1.0| a| 0.0|(2,[0],[1.0])|
| 1.5| a| 0.0|(2,[0],[1.0])|
|10.0| b| 2.0| (2,[],[])|
| 3.2| c| 1.0|(2,[1],[1.0])|
| 3.8| c| 1.0|(2,[1],[1.0])|
+----+---+-----+-------------+
与其他特征组合为特征向量后,将置为0,请看下例

显示结果如下:
+----+---+-----+-------------+------------------+
|x |c |c_idx|c_idx_vec |features |
+----+---+-----+-------------+------------------+
|1.0 |a |0.0 |(2,[0],[1.0])|[1.0,0.0,1.0,0.0] |
|1.5 |a |0.0 |(2,[0],[1.0])|[1.5,0.0,1.0,0.0] |
|10.0|b |2.0 |(2,[],[]) |[10.0,2.0,0.0,0.0]|
|3.2 |c |1.0 |(2,[1],[1.0])|[3.2,1.0,0.0,1.0] |
|3.8 |c |1.0 |(2,[1],[1.0])|[3.8,1.0,0.0,1.0] |
+----+---+-----+-------------+------------------+
如果想不删除最后一个分类,可添加setDropLast(False)。

显示结果如下:
+----+---+-----+-------------+
| x| c|c_idx| c_idx_vec|
+----+---+-----+-------------+
| 1.0| a| 0.0|(3,[0],[1.0])|
| 1.5| a| 0.0|(3,[0],[1.0])|
|10.0| b| 2.0|(3,[2],[1.0])|
| 3.2| c| 1.0|(3,[1],[1.0])|
| 3.8| c| 1.0|(3,[1],[1.0])|
+----+---+-----+-------------+
与其他特征向量结合后,情况如下:

显示结果如下:
+----+---+-----+-------------+----------------------+
|x |c |c_idx|c_idx_vec |features |
+----+---+-----+-------------+----------------------+
|1.0 |a |0.0 |(3,[0],[1.0])|(5,[0,2],[1.0,1.0]) |
|1.5 |a |0.0 |(3,[0],[1.0])|(5,[0,2],[1.5,1.0]) |
|10.0|b |2.0 |(3,[2],[1.0])|[10.0,2.0,0.0,0.0,1.0]|
|3.2 |c |1.0 |(3,[1],[1.0])|[3.2,1.0,0.0,1.0,0.0] |
|3.8 |c |1.0 |(3,[1],[1.0])|[3.8,1.0,0.0,1.0,0.0] |
+----+---+-----+-------------+----------------------+
2、如果分类中出现空字符,需要进行处理,如设置为"None",否则会报错。

4.2.11 向量-索引变换

在下面的例子中,我们读取一个标记点的数据集,然后使用VectorIndexer来决定哪些特征应该被视为分类。我们将分类特征值转换为它们的索引。这个变换的数据然后可以被传递到诸如DecisionTreeRegressor的处理分类特征的算法。

4.2.12交互式

例子,假设我们有以下DataFrame的列“id1”,“vec1”和“vec2”
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
应用与这些输入列的交互,然后interactionedCol作为输出列包含:
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
以下是实现以上转换的具体代码:

4.2.13正则化

以下示例演示如何加载libsvm格式的数据集,然后将每行标准化为具有单位L1范数和单位L∞范数。

4.2.14规范化(StandardScaler)

以下示例演示如何以libsvm格式加载数据集,然后规范化每个要素的单位标准偏差。

4.2.15最大值-最小值缩放

下面的示例展示如果读入一个libsvm形式的数据以及调整其特征值到[0,1]之间。
调用示例:

显示结果如下:
Features scaled to range: [0.0, 1.0]
+--------------+--------------+
| features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+

4.2.16最大值-绝对值缩放

以下示例演示如何加载libsvm格式的数据集,然后将每个特征重新缩放到[-1,1]。

运行结果如下:
+--------------+----------------+
| features| scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]| [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]| [1.0,1.0,1.0]|
+--------------+----------------+

4.2.17离散化重组

以下示例演示如何将双列列存储到另一个索引列的列中。

运行结果如下:
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
| -999.9| 0.0|
| -0.5| 1.0|
| -0.3| 1.0|
| 0.0| 2.0|
| 0.2| 2.0|
| 999.9| 3.0|
+--------+----------------+

4.2.18元素乘积

下面的示例演示了如何使用变换向量值来变换向量

运行结果如下:
+---+-------------+-----------------+
| id| vector|transformedVector|
+---+-------------+-----------------+
| a|[1.0,2.0,3.0]| [0.0,2.0,6.0]|
| b|[4.0,5.0,6.0]| [0.0,5.0,12.0]|
+---+-------------+-----------------+

4.2.19 SQL转换器

假设我们有以下DataFrame和列id,v1和v2
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
这是SQLTransformer "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__":语句的输出。
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
以下是实现以上结果的具体代码:

4.2.20向量汇编

例子
假设我们有一个带有id,hour,mobile,userFeatures和clicked列的DataFrame:
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures是一个包含三个用户特征的向量列。我们希望将hour,mobile和userFeatures合并成一个称为特征的单一特征向量,并使用它来预测是否被点击。如果我们将VectorAssembler的输入列设置为hour,mobile和userFeatures,并将列输出到特征,则在转换后,我们应该得到以下DataFrame:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
以下是实现上述功能的代码:

4.2.21分位数离散化

示例:
假设我们有如下DataFrame包含id,hour:
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour是Double类型的连续特征。我们希望将连续特征变成一个分级特征。给定numBuckets = 3,我们可得到以下DataFrame:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
实现以上功能的scala代码如下:

4.3 特征选择

特征选择(Feature Selection)是从特征向量中选择那些更有效的特征,组成新的、更简单有效的特征向量的过程。它在数据分析中常用使用,尤其在高维数据分析中,可以剔除冗余或影响不大的特征,提升模型的性能。

4.3.1 向量机

假设我们有一个DataFrame与列userFeatures:
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures是一个包含三个用户特征的向量列。假设userFeature的第一列全部为零,因此我们要删除它并仅选择最后两列。 VectorSlicer使用setIndices(1,2)选择最后两个元素,然后生成一个名为features的新向量列:
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
假设userFeatures有输入属性,如[“f1”,“f2”,“f3”],那么我们可以使用setNames(“f2”,“f3”)来选择它们。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
以下是实现向量选择的一个scala代码示例

运行结果:
+--------------------+-------------+
|userFeatures |features |
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(2,[0],[2.3])|
|[-2.0,2.3,0.0] |[2.3,0.0] |
+--------------------+-------------+

4.3.2 R公式

示例:
假设我们有一个DataFrame含有id,country, hour和clicked四列:
id | country |hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
如果我们使用RFormula公式clicked ~ country+ hour,则表明我们希望基于country和hour预测clicked,通过转换我们可以得到如下DataFrame:
id | country |hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
以下是实现上述功能的scala代码:

4.3.3 卡方特征选择

示例:
假设我们有一个DataFrame含有id,features和clicked三列,其中clicked为需要预测的目标:
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
如果我们使用ChiSqSelector并设置numTopFeatures为1,根据标签clicked,features中最后一列将会是最有用特征:
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
使用ChiSqSelector的scala代码示例:

结果显示:
+---+------------------+-------+----------------+
| id| features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
| 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]|
| 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]|
| 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]|
+---+------------------+-------+----------------+

4.4 小结

本章主要介绍了对数据特征或变量的一些常用操作,包括特征提取,特征转换以及特征选择等方法,这些任务在实际项目中往往花费大量时间和精力,尤其要自己编写这方面的代码或函数,更是如此,Spark ML目前提供了很多现成函数,有效使用这些函数将有助于提供我们开发效率,同时使我们有更多时间优化或提升模型性能。下一章我们将介绍优化或提升模型性能一些方法。

第3章 ML Pipelines原理与实战

Spark MLlib 是Spark的重要组成部分,也是最早推出库之一, 其基于RDD的API,算法比较丰富,也比较稳定,也比较好用。但是如果目标数据集结构复杂需要多次处理,或者是对新数据需要结合多个已经训练好的单个模型进行综合计算时,使用 MLlib 将会让程序结构复杂,甚至难于理解和实现。为改变这一局限性,从Spark 1.2 版本之后引入的 ML Pipeline,经过多个版本的发展,目前Spark ML功能齐全、性能稳定,Spark ML克服了MLlib在处理复杂机器学习问题的一些不足(如工作比较复杂,流程不清晰等),向用户提供基于DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用,使整个机器学习过程变得更加易用、简洁、规范和高效。Spark的Pipeline与Scikit中Pipeline功能相近、理念相同。本章主要介绍Spark ML中Pipelines的有关内容。
本章主要介绍ML Pipeline相关内容,包括:
 Pipeline简介
 DataFrame
 构成Pipeline的一些组件
 介绍pipeline的一般原理
 使用pipeline的几个实例

3.1Pipeline简介

3.2DataFrame

以下通过一个实例来说明DataFrame的创建、操作等内容:

3.3 Pipeline组件

Pipeline组件主要包括Transformer和Estimator。
1. Transformer
2.Estimator

3.4 Pipeline原理

要构建一个Pipeline,首先需要定义Pipeline中的各个Stage,如指标提取和转换模型训练等。有了这些处理特定问题的Transformer和Estimator,我们就可以按照具体的处理逻辑来有序的组织Stages并创建一个Pipeline。

图3-1 pipeline在训练数据上的流程

整个流水线是一个估计器。所以当流水线的fit()方法运行后,会产生一个流水线模型,流水线模型是转换器。流水线模型会在测试时被调用,下面的图示说明用法。

图3-2 pipeline在测试数据上的流程

上面的图示中,流水线模型和原始流水线有同样数目的阶段,然而原始流水线中的估计器此时变为了转换器。当流水线模型的transform()方法被调用于测试数据集时,数据依次经过流水线的各个阶段。每个阶段的transform()方法更新数据集,并将之传到下个阶段。
流水线和流水线模型有助于确认训练数据和测试数据经过同样的特征处理流程。
以上两图如果合并为一图,可用如下图形表达:

 

图3-3 Spark pipeline 流程图

其中Pipeline及LogisticRegression都Estimator,Tokenizer,HashingTF,LogisticRegression Model为Transformer。

3.5Pipeline实例

3.5.1使用Estimator, Transformer, and Param实例

机器学习整个过程中,特征转换、特征选择、派生特征等工作,一般需要占据大部分时间,现在ML提供了很多Transformer,如OneHotEncoder、StringIndexer、PCA、Bucketizer、Word2vec等,利用这些函数可极大提高工作效率。
以下通过实例说明如何使用ML库中Estimaor、Transformer和Param等。

3.5.2ML使用Pipeline实例

使用Pipeline的实例。

3.6小结

本章主要介绍了流水线(pipeline)的基本概念,流水线的两个组件:Transformer和Estimator,它们是构成Pipeline的Stage,把这些stages按照一定次序组装到Pipeline上,就构成一个流水线,这些stages包括特征转换、特征选择、模型训练等任务,通过几个实例具体说明Pipeline的创建及使用。流水线是一项重要内容,章节还有很多实际使用实例,下一章主要介绍构成Pipeline的一些stages。熟练使用这些Stages有助于提升我们开发效率。


第2章构建Spark机器学习系统
构建机器学习系统,根据业务需求和使用工具的不同,可能会有些区别。不过主要流程应该差别不大,基本包括数据抽取、数据探索、数据处理、建立模型、训练模型、评估模型、优化模型、部署模型等阶段,在构建系统前,我们需要考虑系统的扩展性、与其他系统的整合、系统升级及处理方式等。这章我们主要介绍针对基于Spark机器学习的架构设计或系统构建的一般步骤、需要注意的一些问题。
本章主要介绍构建Spark机器学习系统的一般步骤:
介绍系统架构
启动集群
加载数据
探索数据
数据预处理
构建模型
模型评估
模型优化
模型保存

2.1机器学习系统架构

Spark发展非常快,到我们着手编写本书时,Spark已升级为2.1版,这2.0以后,Spark大大增强了数据流水线的内容,数据流水线的思路与SKLearn非常相似,我想这种思路或许是未来的一个趋势,使机器学习的流程标准化、规范化、流程化,很多原来需要自己编写代码都有现成的模块或函数,模型评估、调优这些任务也可实现了更高的封装,这大大降低机器学习门槛。

图2-1 Spark机器学习系统的架构图
其中数据处理、建模训练,我们可以进行组装成流水线方式,对模型评估及优化可以采用自动化方式。

2.2启动集群

Spark集群的安装配置,这里不做详细介绍,我们提供了本书可操作云平台,对Spark集群的安装配置感兴趣的读者,可参考由我们编写的《自己动手做大数据系统》。
Spark运行方式有本地模式、集群模式,本地模式所有的处理都运行在同一个JVM中,而后者,可以运行在不同节点上。具体运行方式主要有:
表2-1 Spark运行模式

本书主要以Spark Standalone(独立模式)为例,如果想以其他模式运行,只要改动对应参数即可。
Spark支持Scala或Python的REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式程序编写,交互式编程,输入的代码执行后立即能看到结果,非常友好和方便。
在2.0之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。SparkContext与驱动程序(Driver Program)和集群管理器(Cluster Manager)间的关系如图2-2所示:

图2-2 SparkContext与驱动程序、集群管理器间的关系图

从图中可以看到SparkContext起中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,Driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext。
不过在Spark 2.0中引入SparkSession对象(spark),运行Spark shell则自动创建一个SparkSession对象,在输入spark时就会发现它已经存在了(参考图2.图2-3),SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,它为用户提供了一个统一的切入点,同时也提供了各种DataFrame和Dataset的API,大大降低了学习Spark的难度。

图2-3 启动Spark shell界面

图2-3是启动Spark的集群的界面,编程语言是Scala,如果希望使用Python为编辑语句,该如何启动呢?运行pyspark即可。

图2-4 启动PySpark的客户端

2.3加载数据

这里以MovieLens 100k(http://files.grouplens.org/datasets/movielens/ml-100k.zip)数据集中的用户数据(u.data)为例,首先在本地查看数据的基本信息,然后把本地文件复制到HDFS上,Spark或PySpark读取读取hdfs上的数据。
查看u.user文件的基本信息,数据样例,总记录数等信息。

$ head -3 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
$ cat u.user |wc -l
943

u.user用户数据每列的含义为:user id | age | gender | occupation | zip code,即用户ID,用户年龄,用户性别、用户职位、所在地邮编等信息,列间的分隔符为竖线(|),共有943条记录。
如何把用户信息复制到HDFS上?首先,查看当前HDFS的目录信息。

$ hadoop fs -ls /u01/bigdata/
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2017-02-07 03:20 /u01/bigdata/data
drwxr-xr-x - hadoop supergroup 0 2016-07-20 09:16 /u01/bigdata/hive

由此可知在HDFS已有/u01/bigdata/data目录(如果没有目录可以通过hadoop fs -mkdire /u01/bigdata/data命令创建。),通过以下命令,把本地文件u.user复制到HDFS上。

$ hadoop fs -put u.user /u01/bigdata/data
//查看HDFS上的文件
$ hadoop fs -ls /u01/bigdata/data
-rw-r--r-- 1 hadoop supergroup 22628 2017-03-18 13:37 /u01/bigdata/data/u.user

把电影评级数据(u.data)、电影数据(u.item)等复制到HDFS方法相同,把本地数据复制到HDFS后,Spark如何读取加载HDFS上的文件?我们可以通过Spark的textFile方法读取。这里我们以PySpark为例,启动PySpark客户端,导入需要是的包,然后通过textFile方法读取HDFS上的数据,具体请看以下示例:

###以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的包
from pyspark.sql import SparkSession
from pyspark.sql import Row
##初始化sparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
###加载数据,并处理分割符数据
sc = spark.sparkContext
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
###利用反射机制推断模式(Schema),把dataframe注册为一个table
df = userrdd.map(lambda fields: Row(userid=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")

2.4探索数据

生产环境中数据往往包含很多脏数据,如缺失数据、不一致、不规范、奇异数据等等,所以数据加载后,数据建模前,需要对数据进行分析或探索,尤其面对大数据,了解数据的统计信息、数据质量、数据特征等,为数据处理、数据建模提供重要依据,在进行这些数据分析时,如果能实现数据的可视化,当然更利于我们理解数据。
2.4.1数据统计信息
加载数据后,首先关注的数据的统计信息,有了数据统计信息,我们对数据就有了一个大致了解,如数据特征的最大值、最小值、平均值、分位数、方差等。这些信息有助于我们理解数据质量、数据构成,为数据预处理提供重要依据。
#查看用户各字段的统计信息

schemauser.describe("userid","age", "gender","occupation","zip").show()
+-------+-----------------+-----------------+------+-------------+------------------+
|summary| userid| age|gender| occupation| zip|
+-------+-----------------+-----------------+------+-------------+------------------+
| count| 943| 943| 943| 943| 943|
| mean| 472.0|34.05196182396607| null| null| 50868.78810810811|
| stddev|272.3649512449549|12.19273973305903| null| null|30891.373254138158|
| min| 1| 7| F|administrator| 00000|
| max| 99| 73| M| writer| Y1A6B|
+-------+-----------------+-----------------+------+-------------+------------------+

从以上统计可以看出,用户表总记录数为943条,年龄最小为9岁,最大为73岁,平均年龄为34岁。
2.4.2数据质量分析
数据质量分析是数据探索阶段重要一环,数据不是完美的,大多数据大多包含缺少数据、不一致数据、异常数据、噪音数据等。没有可信的数据,再好的模型性能都太可能好,正所谓“垃圾进,垃圾出”。
数据质量方面的分析,主要包括以下几个方面:
1)缺失值;
2)异常值;
3)不一致的值
4)错误数据
数据集下载
本节以一份某酒店的销售额的数据为例,来说明在数据探索中,对数据质量的一般分析方法,主要涉及缺少值、异常值、不一致数据等。

##以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的库
import pandas as pd
import matplotlib.pyplot as plt
###加载数据,使用标题行
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##查看df的统计信息
df.count() ##统计非空值记录数
sale_date 200
sale_amt 198 ###说明sale_amt有两个空值
df.describe() ###获取df的统计信息
sale_amt
count 198.000000
mean 2765.545152
std 709.557639
min 22.000000
25% 2452.725000
50% 2655.850000
75% 3023.500000
max 9106.440000

#建立图像
plt.figure()
#画箱线图
bp = df.boxplot()
# flies为异常值的标签
x = bp['fliers'][0].get_xdata()
y = bp['fliers'][0].get_ydata()
y.sort()

#用annotate添加注释
for i in range(len(x)):
plt.annotate(y[i], xy = (x[i],y[i]), xytext=(x[i]+0.1-0.8/(y[i]-y[i-1]),y[i]))

plt.show()

图2-5 销售额箱型图检测异常值

从以上分析,可知,销售额列存在两个空值、6个可能的异常值,其中865.0,1060.0有可能属于正常值,当然也需要和也相关业务员沟通,对其他异常值,需要进一步分析异常值产生的原因,然后,确定数据的去留。
2.4.3数据特征分析
对数据质量有基本了解后,接下来就可就数据的特征进行分析,数据特征分析一般包括以下一些内容:
特征分布分析
对比分析
统计量分析
特征一般指用于模型训练的变量,原始数据中特征,有些是数值,有些是字符或其他格式信息,但在进行机器学习前,都需要转换为数值。根据实际情况,有时需要根据已有特征生成或衍生出新特征,如根据用户年龄衍生出表示老、中、青的新特征;有时需要对一些特征进行规范化、标准化等转换,尤其对回归类模型。
2.4.3.11.数据特征分析
特征的分布分析有助于发现相关数据的分布特征、分布类型、分布是否对称等,可以使用数据可视化方法,易直观发现特征的异常值等。以用户信息数据为例,分析用户的年龄特征、职业特征等。

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

sc = spark.sparkContext
# 加载textfile文件并转换为行式
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
#利用反射机制把RDD转换为DataFrame
df = userrdd.map(lambda fields: Row(name=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))

# 把dataframe注册为一个table.
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")

# 在table上运行SQL.
age = spark.sql("SELECT age FROM user")
#把运行结果转换为RDD
ages = age.rdd.map(lambda p: p.age).collect()
hist(ages, bins=20, color='lightblue', normed=True)

图2-6 用户年龄特征分布图

从以上图形可以看出,最小年龄在10岁左右,最大年龄超过70岁,大部分是20岁到40岁之间。
我们还可以进一步分析用户职业分布特征。

# 选取用户职业数据.
count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user Group by occupation order by cnt")
#查看前5行数据
count_occp.show(5)
+----------+---+
|occupation|cnt|
+----------+---+
| homemaker| 7|
| doctor| 7|
| none| 9|
| lawyer| 12|
| salesman| 12|
+----------+---+

#获取职业名称及职业数,以便画出各职业对应总数图形
#把运行结果转换为RDD
x_axis = count_occp.rdd.map(lambda p: p.occupation).collect()
y_axis = count_occp.rdd.map(lambda p: p.cnt).collect()

pos = np.arange(len(x_axis))
width = 1.0
###隐式新增一个figure,或为当前figure新增一个axes
ax = plt.axes()
ax.set_xticks(pos + (width / 2)) ###设置x轴刻度
ax.set_xticklabels(x_axis) ####在对应刻度打上标签

plt.bar(pos, y_axis, width, color='orange')
plt.xticks(rotation=30) ####x轴上的标签旋转30度
fig = matplotlib.pyplot.gcf() ###获取当前figure的应用
fig.set_size_inches(16, 10) ###设置当前figure大小

图2-7 用户职业分布图

从以上用户职业分布图,可以看出,学生占绝大多数,其次其他职业、教育工作者、管理者、工程师等。医生、家庭主妇或许平时较忙,故数量比较少。
2.4.3.22.特征分布及相关性分析
在数据探索阶段,分析特征分布,特征间的相关性等,对应后续的特征选择、特征提取将提供重要依据,以下是对类似共享单车数据的特征分析,详细内容可参考第9章的9.3节

###探索特征间分布、相关性等
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

data1=pd.read_csv('/home/hadoop/data/bike/hour.csv',header=0)
data_pd=data1.toPandas()
sns.set(style='whitegrid',context='notebook')
cols=['temp','atemp','label']
sns.pairplot(data_pd[cols],size=2.5)
plt.show()

图2-8 hours数据集特征分布及相关性示例图
2.4.3.33.对比分析

###导入需要的库
import pandas as pd
###把日期列作为索引,并转换为日期格式
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)
###把空值置为0
df1=df.fillna(0)
###根据年月求和
df_ym=df1.resample('M',how='sum')
##取年月
df2=df_ym.to_period('M')
##数据可视化
df2.plot(kind='bar',rot=30)

图2-9销售月份对比图
2.4.4数据的可视化
数据的可视化是数据探索、数据分析中重要任务,通过可视化可帮助我们发现数据的异常值、特征的分布情况等,为数据预处理提供重要支持。Spark目前对数据的可视化功能还很弱或还没有,不过,没关系,我们可以借助Python或R等可视化功能,Python和R在数据可视化方面功能很强大,这里以Python的数据可视化为例。Python的数据表现能力很强,可以2D或3D等方式展示,视化可以使用matplotlib或plot等方法。matplotlib是一种比较低级但强大的绘图工具,可以进行很多定制化,但往往需要较大代码来实现;Plot是一种非常简洁的绘图工具,它主要基于pandas基础之上,以下我们通过两个示例来具体说明:
下例是通过matplotlib可视化sin(x)和cos(x)函数的图形。

# -*- coding: utf-8 -*-
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif']=['SimHei'] ###显示中文
plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上的-号变为方块
x = np.linspace(0, 10, 100)
y = np.sin(x)
y1 = np.cos(x)
##绘制一个图,长为10,宽为6(默认值是每个单位80像素)
plt.figure(figsize=(10,6))
###在图列中自动显示$间内容
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
plt.plot(x,y1,"b--",label="$cos(x^2)$") ###b(blue),--线形
plt.xlabel(u"X值") ##X坐标名称,u表示unicode编码
plt.ylabel(u"Y值")
plt.title(u"三角函数图像") ##t图名称
plt.ylim(-1.2,1.2) ##y上的max、min值
plt.legend() ##显示图例
plt.savefig('fig01.png') ##保持到当前目录
plt.show()

运行结果如下:

图2-10 matplot数据可视化
同样的这些数据,如果我们对这些数据使用plot来进行可视化,代码可以非常简洁,但定制化方面可能要弱一些。

from pandas import DataFrame
import pandas as pd
import numpy as np

x = np.linspace(0, 10, 100)
df=DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)
df.plot()

显示图形如图2-11所示下:

图2-11 plot数据可视化

从以上实现代码可以看出,如果使用plot则非常简单,虽然定制化要比matplotlib少些,但其可定制的项也不少,如kind,rot,title,legend等等。

2.5数据预处理

前面我们介绍了探索数据的一些方法,通过对数据的探索,可以帮助我们发现一些奇异值、缺失值、一些特征的类别及其分布情况等信息。而这些信息正是对数据预处理的重要依据。在数据分析、机器学习中,数据的预处理是一个非常关键、尤其是涉及大数据的处理,往往是比较费时、费神的一个过程,有时,还需要往返多次。当然,如果数据预处理得好,除提高数据质量外,更能极大提高模型的性能,反之,对模型的影响也是很大,甚至可能垃圾进,垃圾出。
数据的预处理一般包括数据清理、数据转换、数据集成、数据归约等。这些预处理主要内容可以通过以下图形2-12来表示:

 

图2-12数据预处理示意图
2.5.1数据清理
数据清理主要任务是填补缺失值、光滑噪声数据、处理奇异数据、纠正错误数据、删除重复数据、删除唯一性属性、去除不相关字段或特征、处理不一致数据等。噪声数据的处理方法:分箱、聚类等。以下分别以处理缺失数据、异常数据为例,说明在spark中如何处理。
1. 处理缺失值

import pandas as pd
##读取HDFS上的数据
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##定位数据集中的空值
df[df.isnull().values==True]
##显示结果如下,说明有2个空值
sale_date sale_amt
13 2015/2/14 NaN
32 2015/1/26 NaN
###以0填补空值
df.fillna(0)
##或该列的平均值填补空值
df['sale_amt'].fillna(df['sale_amt'].count())
##或用该列前一行值填补空值
df.fillna(method='pad')

2. 处理奇异值
在数据探索阶段,我们发现销售数据文件catering_sale.csv中有6个可能的奇异值,假设与相关人员核实后,只有22为奇异值或错误数据,对错误数据我们一般采用删除或替换的方法,这里我们采用Spark SQL来处理奇异数据。
首先把数据复制到HDFS,用Spark读取数据,如果启动pyspark,则可以通过spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)读取;如果启动spark-shell启动,则可以采用 spark.read.option("header","true").csv("hdfs://192.168.1.112:9000/home
/hadoop/data/catering_sale.csv")的方式读取。

#读取CSV文件,保留文件标题,并创建spark 的一张derby数据库的表
df=spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)
##转换数据类型
df1=df.select(df['sale_date'],df['sale_amt'].cast("Double"))
###假设把22.0奇异值替换为200.0
df1.replace(22.0,200.0,'sale_amt')

这里我们使用了DataFrame的select、replace等方法,实际上df还有很多可利用的方法或函数,可以通过df.+Tab键查看:

这些方法或函数的具体使用,可以通过df.方法名?的方式查看,下例为查看df.filter的详细用法:

此外,我们还可以使用大量spark.sql.functions或pyspark.sql.functions,以下是使用去除字段左右空格、截取字段长度等内置函数示例:

from pyspark.sql.functions import *
###去空格
df.select(trim(df.sale_date)).show()
###去年份
df.select(substring(df.sale_date,1,4).alias('year'),df.sale_amt).show()

2.5.2数据变换
数据变换是数据预处理中一项重要内容,如对数据进行数据的规范化、离散化、衍生指标、类别特征数值化、平滑数据等都属于数据变换。数据变换Spark ML有很多现成的算法,利用这些算法可极大提高整个数据处理的效率,下表2-2只是为一个概况,更多更详细信息请可参考第4章。
表2-2 Spark ML自带的数据变换算法

这里我们以卡方检验为例,如何根据特征的贡献率来选择特征。假设我们很多特征,如:表示时间的特征:季节(season)、年月(yr)、月份(mnth)、是否节假日(holiday)、是否周末(weekday);表示天气的特征weathersit,temp等等,为了使用卡方检验来选择这些特征,首先需要把各特征组合一个特征向量,然后,把整合后特征向量、及选择特征个数等代入卡方模型中,详细代码如下:

//定义特征向量
featuresArray =["season","yr","mnth","hr","holiday","weekday","workingday",\
"weathersit","temp","atemp","hum","windspeed"]

###把各特征组合成特征向量features
assembler = VectorAssembler(inputCols=featuresArray,outputCol="features")
###选择贡献度较大的前5个特征
selectorfeature = ChiSqSelector(numTopFeatures=5, featuresCol="features",outputCol="selectedFeatures", labelCol="label")

2.5.3数据集成
数据集成是数据预处理的重要内容之一,将多文件或者多数据库中的数据进行合并,然后存放在一个一致的数据存储中。数据集成一般通过join或union、merge等关键字把两个(或多个)数据集连接在一起,Spark SQL(包括DataFrame)有join方法,Pandas下有merge方法。数据集成往往需要耗费很多资源,尤其是大数据间的集成涉及到shuffle过程,有时需要牵涉到多个节点,数据集成除了数据一致性外,性能问题常常不请自来,需要我们特别留心。
传统数据库一般是单机上采用hash join方法,如果在分布式环境中,采用join时,可以考虑充分利用分布式资源进行平行化,当然,在进行join之前,对数据过滤或归约也是常用的优化方法。
Spark SQL中有三种join方法:
broadcast hash join:
如果join的表中有一张大表和一张较少的表,可以考虑把这张小表广播分发到另一张 大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。
shuffle hash join:
如果两张表都不小,对数据量较大的表进行广播分发就不太适合。这种情况下,可以根 据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区, 这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。
sort merge join:
如果两张表都比较大,可以考虑使用sort merge join方法,先将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,然后,对单个分区节点的两表数据,分别进行排序,最后,对排好序的两张分区表数据执行join操作。
当然,如果两表都不大,可以直接使用hash join。
DataFrame中join有(或merge):内连接、左连接、右连接等。
2.5.4数据归约
大数据是机器学习的基础,但大数据往往数据量非常大,有时我们可以通过数据归约技术,删除或减少冗余属性(或维)、精简数据集等,使归约后数据比原数据小或小得多,但仍然接近于保持原数据的完整性,并结果与归约前结果相同或几乎相同。
表2-3 Spark ML 自带的数据选择算法


选择特征或降维是机器学习中重要的处理方法,我们可以使用这些方法在减少特征个数、消除噪声等问题的同时,维持原始数据的内在结构或主要特征。尤其是降维,在大数据、机器学习中发挥中重要作用,以下通过两个实例说明SVD、PCA具体使用。目前Spark MLlib支持SVD及PCA。

import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.SingularValueDecomposition import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) //保留前3个奇异值,需要获得U成员 val svd = mat.computeSVD(3, computeU = true) //通过访问svd对象的V、s、U成员分别拿到进行SVD分解后的 //右奇异矩阵、奇异值向量和左奇异矩阵: val U: RowMatrix = svd.U //左奇异矩阵 val s: Vector = svd.s //从大到小的奇异值向量 [30.88197557931219,10.848035248251415,8.201924156089822] val V: Matrix = svd.V //右奇异矩阵 -0.33309047675110115 0.6307611082680837 0.10881297540284612 -0.252559026169606 -0.13320654554805747 0.4862541277385016 -0.3913180354223819 0.3985110846022322 0.20656596253983592 -0.33266751598925126 0.25621153877501424 -0.3575093420454635 -0.35120996186827147 -0.24679309180949208 0.16775460006130793 -0.1811460330545444 0.03808707142157401 -0.46853660508460787 -0.35275045425261 -0.19100365291846758 -0.26646095393100677 -0.2938422406906167 -0.30376401501983874 -0.4274842789454556 -0.44105410502598985 -0.4108875465911952 0.2825275707788212

同样这个矩阵data,以下我们用PCA进行分解,看一下效果及与SVD的异同,SVD分解后右奇异矩阵V与PCA降维后的矩阵pc很相似。

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) val pc: Matrix = mat.computePrincipalComponents(3) -0.3948204553820511 -0.3255749878678745 0.1057375753926894 0.1967741975874508 0.12066915005125914 0.4698636365472036 -0.09206257474269655 -0.407047128194367 0.3210095555021759 0.12315980051885281 -0.6783914405694824 -0.10049065563002131 0.43871546256175087 -0.12704705411702932 0.2775911848440697 -0.05209780173017968 0.10583033338605327 -0.6473697692806737 0.422474587406277 -0.27600606797384 -0.13909137208338707 0.46536643478632944 -0.172268807944553 -0.349731653791416 0.4376262507870099 0.3469015236606571 0.13076351966313637

使用PCA降维,利用pyspark的画图功能,可以新生成的特征的方差贡献度进行可视化,下图为对hour.csv数据,通过PCA处理后,重要特征的排序情况:

图2-13 hour.csv数据的PCA分析图

2.6构建模型

前面我们介绍了准备阶段,包括加载数据、探索数据、预处理数据等,数据准备阶段往往是最费时间和精力的,常常这个问题解决了,又会出现新问题,经常需要返回多次。一般而言数据准备阶段从时间上来说可能要占据60%左右,有时更多。 数据准备后以后,接下来就是构建模型,模型是机器学习、数据挖掘等的核心,构建模型涉及确定模型或算法、设置参数、运算模型等,其大致流程如图2.11-14所示。

图2.14构建模型流程

选择算法主要依据业务需求、数据特征等,Spark目前支持分类、回归、推荐等这些常用而且重要的算法,具体可参考表2.3 所示。一种类型往往有几种算法,如分类可以逻辑、决策树等,如何选择算法,需要考虑业务需求、数据特征、算法适应性、个人经验等,当然,也可选择几种方法,然后进行比较,或采用集成学习的方式,复合多种算法也是选项之一,如先采用聚类方法对数据进行聚类,然后对不同类别的数据进行预测或推荐,有时会得到更好的结果。如果你觉得选择比较难或还不好确定,可以先从简单或熟悉的方法开始,然后,不断完善和优化。

表2-3 Spark ML 目前支持的算法

确定算法后,一般还需要设置一些参数,如训练决策树时需要选择迭代次数、纯度计算方法、树的最大高度等,此外,对准备好的数据需要进行划分,一般划分为训练数据和测试数据,有的会把训练数据进一步划分为训练数据集、验证数据集。Spark 提供多种随机划分数据的方法,如randomSplit、CrossValidator等。这些方法的具体使用在2.8节模型调优中将会具体说明。 训练数据用于训练模型,测试数据用于验证模型,因这个环节的验证是在模型训练过程中验证,所以它一般也认为隶属于模型建立过程。这种验证方法一般称为交叉验证(CrossValidator,CV),有些交叉验证把数据分成K组,如K折交叉验证(K-fold Cross Validator,K-CV ),在K折-交叉验证中,采用不重复地随机将数据集划分为K对,如果K=3,则将产生3个(训练,测试)数据集对,每个数据集使用2/3的数据进行训练,1/3进行测试。,这样会得到3个模型,用这3个模型的平均数作为最终模型的性能指标。K-CV可以有效的避免欠学习状态的发生,其结果也比较具有说服性。

2.7模型评估

模型构建以后,接下来就需要对该模型的性能、与目标的切合度等进行一些评估,模型评估是模型开发过程的不可或缺的一部分。在构建模型的过程中,会产生一些评估指标,如精确度、ROC、RMSE等等,这这些指标是重要而且基础的,但应该不是唯一和最终指标,除了这些指标外,我们还应该评估模型对业务的提示或商业目标的达成等方面贡献。一个好的模型不但要有好的技术指标,更要为解决实际问题提供帮助,有时后者显得更为重要。 Spark中常用的几个评估算法有: 均方差(MSE,Mean Squared Error): (∑(prec-act)**2)/n(prec为预测值,act为实际值,n为总样本数) 均方根差(RMSE,Root Mean Squared Error): 就是MSE开根号 平均绝对值误差(MAE,Mean Absolute Error): (∑|prec-act|)/n 在了解正确率、准确率之前,我们先看一个所谓的混淆矩阵(confusion matrix):

 

2.15混淆矩阵

混淆矩阵是一个简单矩阵,用于展示一个二分类器的预测结果,其中,T-True、F-False、N-Negative、P-Postitive。 真正(TP)被模型预测为正的正样本数;可以称作判断为真的正确率; 真负(TN) 被模型预测为负的负样本数;可以称作判断为假的正确率; 假正(FP) 被模型预测为正的负样本数;可以称作误报率; 假负(FN) 被模型预测为负的正样本数;可以称作漏报率. 正确率(Accuracy): A = (TP + TN)/(P+N) = (TP + TN)/(TP + FN + FP + TN) 反映了分类器统对整个样本的判定能力——能将正的判定为正,负的判定为负。 错误率(Error): E= (FP + FN)/(P+N) = (FP + FN)/(TP + FN + FP + TN) 准确率(Precision) P = TP/(TP+FP) ; 反映了被分类器判定的正例中真正的正例样本的比重 召回率(Recall): R = TP/(TP+FN) = 1 - FN/T; 反映了被正确判定的正例占总的正例的比重 F1-Measure: F1=2P*R/(P+R) 真阳性率(TPR): TPR= TP/(TP+FN),代表分类器预测的正类中实际正实例占所有正实例的比例。 假阳性率(FPR): FPR= FP/(FP+TN),代表分类器预测的正类中实际负实例占所有负实例的比例。 以上这些都属于静态的指标,当正负样本不平衡时它会存在着严重的问题。极端情况下比如正负样本比例为1:99(有些领域并不少见),那么一个分类器只要把所有样本都判为负,它就拥有了99%的精确度,但这时的评价指标是不具有参考价值的。另外,很多分类器都不是简单地给出一个正或负(0或1)的分类判定,而是给出一个分类的倾向程度,比如贝叶斯分类器输出的分类概率。对于这些分类器,当你取不同阈值,就可以得到不同的分类结果及分类器评价指标,依此人们又发明出来ROC曲线以及AUC(ROC曲线包围面积)指标来衡量分类器的总体可信度。ROC曲线将FPR和TPR定义为x和y轴,这样就描述了真阳性和假阳性不同决策阈值下之间的关系。AUC越大说明模型性能越好,ROC曲线如下图:

图2-16 ROC曲线示意图 下面通过一个实例说明Spark一些评估指标的使用:

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.evaluation.RegressionMetrics val path="file:///u01/bigdata/spark/data/mllib/sample_libsvm_data.txt" val data=spark.read.format("libsvm").load(path) val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L) //参数说明 // threshold变量用来控制分类的阈值,默认值为0.5 val lr = new LogisticRegression() .setThreshold(0.6).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val lrModel = lr.fit(trainingData) val predictions = lrModel.transform(testData) predictions.show() //计算MSE、MAE、 RMSE等 val evaluator = new BinaryClassificationEvaluator() .setLabelCol("label") val accuracy = evaluator.evaluate(predictions) val rm2 = new RegressionMetrics(predictions.select("prediction", "label").rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
println("MSE: " + rm2.meanSquaredError)
println("MAE: " + rm2.meanAbsoluteError)
println("RMSE Squared: " + rm2.rootMeanSquaredError)

//将其作为多分类结果进行评估,可计算F1、准确率、召回率、正确率
val multiclassClassificationEvaluator = new MulticlassClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + multiclassClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}

printlnMetric("f1")//f1 = 0.9646258503401359
printlnMetric("weightedPrecision")//weightedPrecision = 0.9675324675324675
printlnMetric("weightedRecall")//weightedRecall = 0.9642857142857142
printlnMetric("accuracy")//accuracy = 0.9642857142857143

//将其作为二分类结果进行评估,可计算areaUnderROC、areaUnderPR
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + binaryClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}

printlnMetric("areaUnderROC") //结果为areaUnderROC = 0.9944444444444444
printlnMetric("areaUnderPR")//结果为areaUnderPR = 0.9969948018193632
//分类正确且分类为1的样本数量 TP 是17
predictions.filter($"label" === $"prediction").filter($"label"===1).count
//分类正确且分类为0的样本数量 TN 是10
predictions.filter($"label" === $"prediction").filter($"label"===0).count
//分类错误且分类为0的样本数量 FN是1
predictions.filter($"label" !== $"prediction").filter($"prediction"===0).count
//分类错误且分类为1的样本数量 FP是0
predictions.filter($"label" !== $"prediction").filter($"prediction"===1).count

准确率:TP/(TP+FP)=17/(17+0)=1
召回率:TP/(TP+FN) = 17/(17+1)=0.944444

2.8组装

我们对数据集进行了探索,之后进行大量的数据清理、转换等工作,对数据预处理后,构建模型、评估模型。评估模型前我们需要对数据集随机划分为训练集和测试集。假如数据有变化,如新增数据,如何保证训练集和测试集上的操作保持一致?如果数据清理、数据转换等有很多步骤,如何保证这些步骤依次执行?
采用Spark pipeline能很好解决这些问题。我们只要把这些任务,作为pipeline的stage,按照其本身的执行次序把这些stages组装到一个pipeline上。(当然如果任务比较复杂,我们也可以采用多个pipeline,然后把这些作为pipeline的stage,组装到一个新的pipeline。)
组装的步骤一般是:
1、创建pipeline,并各个stages依次组装在一起,如:

val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

2、在训练集上拟合这个pipeline

val model = pipeline.fit(training)

3、在测试集上,做预测。

model.transform(test).select("label", "prediction")

通过这种方式,既可保证stages有序执行,也可保证在训练集和测试集上所做逻辑操作的一致性,这里只是举了一个简单例子,下一章将详细介绍有关Pipeline的内容,第7章后,还有详细的使用实例。

2.9模型选择或调优

在ML中一个重要的任务就是模型选择,或者使用给定的数据为给定的任务寻找最适合的模型或参数。这个过程也叫做调优。调优可以是对单个的Estimator,比如LogisticRegression,或者是包含多个算法、特征工程和其他步骤的工作流(Pipeline)中完成。用户可以一次性对整个Pipline进行调优,而不必对Pipline中的每一个元素进行单独的调优。
MLlib支持使用像CrossValidator和TrainValidationSplit这样的工具进行模型选择。这些工具需要以下的组件:
Estimator:用户调优的算法或Pipline。
ParamMap集合:提供参数选择,有时也叫作用户查找的参数网格(parameter grid),参数网格可以使用ParamGridBuilder来构建。
Evaluator:衡量模型在测试数据上的拟合程度。
模型选择工具工作原理如下:
1.将输入数据划分为训练数据和测试数据。
2. 对于每个(训练,测试)对,遍历一组ParamMaps。用每一个ParamMap参数来拟合估计器,得到训练后的模型,再使用评估器来评估模型表现。
3.选择性能表现最优模型对应参数表。
2.9.1 交叉验证 (CrossValidator)
交叉验证(CrossValidator)会从将数据集切分成K折数据集合,分别用于训练和测试,。例如,K=3折时,CrossValidator会生成3个(训练数据,测试数据)对,每一个数据对的训练数据占2/3,测试数据占1/3。为了评估一个ParamMap,CrossValidator 会计算这三个不同的(训练,测试)数据集对在Estimator拟合出的模型上的平均评估指标。
在找出最好的ParamMap后,CrossValidator 会利用此ParamMap在整个训练集上可以训练(fit)出一个泛化能力强,误差相对小的的最佳模型,整个过程处于流程化管理之中,其工作流程图如下:

 

图2-17Spark CrossValidator流程图

虽然利用CrossValidator来训练模型,可以提升泛化能力,但其的代价也比较高,如选择k=3,regParam=(0.1,0.01),numIters=(10,20)这样就需要对模型训练3*2*2=12次。然而,对比启发式的手动调优,这是选择参数的行之有效的方法。
2.9.2训练-验证切分(TrainValidationSplit)
交叉验证的代价比较高昂,为此Spark也为超参数调优提供了训练-验证切分(TrainValidationSplit)。TrainValidationSplit创建单一的(训练,测试)数据集对。它使用trainRatio参数将数据集切分成两部分。例如,当设置trainRatio=0.8时,TrainValidationSplit将会将数据切分80%作为数据集,20%作为验证集,来生成训练、测试集对,并最终使用最好的ParamMap和完整的数据集来拟合评估器。
相对于CrossValidator对每一个参数进行k次评估,TrainValidationSplit只对每个参数组合评估1次。因此它的评估代价没有这那么高,但是当训练数据集不够大的时候其结果相对不够可信。

图2-18 Spark TrainValidationSplit流程图

2.10保存模型

训练、优化模型后,我们需要保存模型,然后把模型移植或部署到其他环境中。
这节主要介绍如何保存模型,如何部署模型等内容,以下是具体示例代码。
1)保存拟合后流水线(pipeline)到磁盘

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

2)保存未拟合的流水线(pipeline)到磁盘

pipeline.write.overwrite().save("/tmp/spark-logistic-regression-model")

3)把拟合后流水线部署到其他环境中。

val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

2.11小结

这一章主要介绍了如何构建Spark学习系统、构建的一般步骤等。,实际上,构建Spark学习系统与我们构建其他平台的学习系统基本相同或相似,一般都包括数据加载、数据探索、数据预测、建模、训练模型、评估模型、优化模型等步骤,但这里我们特别增加一个利用pipeline组装各个任务(stages),这也是Spark ML中基于DataFrame数据集的重要内容,下一章我们将详细介绍有关pipeline的内容。

机器学习简介

大数据、人工智能是目前大家谈论比较多的话题,它们的应用也越来越广泛、与我们的生活关系也越来越密切,影响也越来越深远,其中很多已进入寻常百姓家,如无人机、网约车、自动导航、智能家电、电商推荐、人机对话机器人等等。
大数据是人工智能的基础,而使大数据转变为知识或生产力,离不开机器学习(Machine Learning),可以说机器学习是人工智能的核心,是使机器具有类似人的智能的根本途径。
本章主要介绍机器有关概念、与大数据、人工智能间的关系、机器学习常用架构及算法等,具体如下:
 机器学习的定义
 大数据与机器学习
 机器学习与、人工智能及深度学习
 机器学习的基本任务
 如何选择合适算法
 Spark在机器学习方面的优势

1.1机器学习的定义

机器学习是什么?是否有统一或标准定义?目前好像没有,即使在机器学习的专业人士,也好像没有一个被广泛认可的定义。在维基百科上对机器学习有以下几种定义:
“机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能”。
“机器学习是对能通过经验自动改进的计算机算法的研究”。
“机器学习是用数据或以往的经验,以此优化计算机程序的性能标准。”
一种经常引用的英文定义是:A computer program is said to learn from experience (E) with respect to some class of tasks( T) and performance(P) measure , if its performance at tasks in T, as measured by P, improves with experience E。
可以看出机器学习强调三个关键词:算法、经验、性能,其处理过程如图1-1所示。

图1-1 机器学习处理流程
上图表明机器学习是使数据通过算法构建出模型,然后对模型性能进行评估,评估后的指标,如果达到要求就用这个模型测试新数据,如果达不到要求就要调整算法重新建立模型,再次进行评估,如此循环往复,最终获得满意结果。

1.2大数据与机器学习

我们已进入大数据时代,产生数据的能力空前高涨,如互联网、移动网、物联网、成千上万的传感器、穿戴设备、GPS等等,存储数据、处理数据等能力也得到了几何级数的提升,如Hadoop、Spark技术为我们存储、处理大数据提供有效方法。
数据就是信息、就是依据,其背后隐含了大量不易被我们感官识别的信息、知识、规律等等,如何揭示这些信息、规则、趋势,正成为当下给企业带来高回报的热点。
而机器学习的任务,就是要在基于大数据量的基础上,发掘其中蕴含并且有用的信息。其处理的数据越多,机器学习就越能体现出优势,以前很多用机器学习解决不了或处理不好的问题,通过提供大数据得到很好解决或性能的大幅提升,如语言识别、图像设别、天气预测等等。

1.3 机器学习、人工智能及深度学习

人工智能和机器学习这两个科技术语如今已经广为流传,已成为当下的热词,
然而,他们间有何区别?又有哪些相同或相似的地方?虽然人工智能和机器学习高度相关,但却并不尽相同。
人工智能是计算机科学的一个分支,目的是开发一种拥有智能行为的机器,目前
很多大公司都在努力开发这种机器学习技术。他们都在努力让电脑学会人类的行为模式,
以便推动很多人眼中的下一场技术革命——让机器像人类一样“思考”。
过去10年,机器学习已经为我们带来了无人驾驶汽车、实用的语音识别、有效的网络搜索等等。接下来人工智能将如何改变我们的生活?在哪些领域最先发力?我们拭目以待。
对很多机器学习来说,特征提取不是一件简单的事情。在一些复杂问题上,
要想通过人工的方式设计有效的特征集合,往往要花费很多的时间和精力。
深度学习解决的核心问题之一就是自动地将简单的特征组合成更加复杂的特征,并利用这些组合特征解决问题。深度学习是机器学习的一个分支,它除了可以学习特征和任务之间的关联以外,还能自动从简单特征中提取更加复杂的特征。图1-2 中展示了深度学习和传统机器学习在流程上的差异。如图1-2 所示,深度学习算法可以从数据中学习更加复杂的特征表达,使得最后一步权重学习变得更加简单且有效。

图1-2 机器学习与深度学习流程对比

前面我们分别介绍了机器学习、人工智能及深度学习,它们间的关系如何?

图1-3 人工智能、机器学习与深度学习间的关系

人工智能、机器学习和深度学习是非常相关的几个领域。图1-3说明了它们之间大致关系。人工智能是一类非常广泛的问题,机器学习是解决这类问题的一个重要手段,深度学习则是机器学习的一个分支。在很多人工智能问题上,深度学习的方法突破了传统机器学习方法的瓶颈,推动了人工智能领域的快速发展。

1.4机器学习的基本任务

机器学习基于数据,并以此获取新知识、新技能。它的任务有很多,分类是其基本任务之一。分类就是将新数据划分到合适的类别中,一般用于类别型的目标特征,如果目标特征为连续型,则往往采用回归方法。回归是对新目标特征进行预测,是机器学习中使用非常广泛的方法之一。
分类和回归,都是先根据标签值或目标值建立模型或规则,然后利用这些带有目标值的数据形成的模型或规则,对新数据进行识别或预测。这两种方法都属于监督学习。与监督学习相对是无监督学习,无监督学习不指定目标值或预先无法知道目标值,它可以将把相似或相近的数据划分到相同的组里,聚类就是解决这一类问题的方法之一。
除了监督学习、无监督学习这两种最常见的方法外,还有半监督学习、强化学习等方法,这里我们就不展开了,图1-4展示了这些基本任务间的关系。

图1-4机器学习基本任务的关系

1.5如何选择合适算法

当我们接到一个数据分析或挖掘的任务或需求时,如果希望用机器学习来处理,首要任务是根据任务或需求选择合适算法,选择哪种算法较合适?分析的一般步骤为:

图1-5选择算法的一般步骤

充分了解数据及其特性,有助于我们更有效地选择机器学习算法。采用以上步骤在一定程度上可以缩小算法的选择范围,使我们少走些弯路,但在具体选择哪种算法方面,一般并不存在最好的算法或者可以给出最好结果的算法,在实际做项目的过程中,这个过程往往需要多次尝试,有时还要尝试不同算法。不过先用一种简单熟悉的方法,然后,在这个基础上不断优化,时常能收获意想不到的效果。

1.6 Spark在机器学习方面的优势

在大数据上进行机器学习,需要处理全量数据并进行大量的迭代计算,这要求机器学习平台具备强大的处理能力。Spark与Hadoop兼容,它立足于内存计算,天然的适应于迭代式计算,Spark是一个大数据计算平台,在这个平台上,有我们大家熟悉的SQL式操作组件Spark SQL;功能强大、性能优良的机器学习库Spark MLlib;还有图像处理的Spark Graphx及用于流式处理的Spark Streaming等,具体有以下优势:
 完整的大数据生态系统:我们大家熟悉的SQL式操作组件Spark SQL,还有功能强大、性能优良的机器学习库Spark MLlib、图像计算的SparkGraphx及用于流式处理的SparkStreaming等。
 高性能的大数据计算平台:因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存后续的频繁访问需求。基于内存运算,Spark可以比Hadoop快100倍,在磁盘中运算也比hadoop快10倍左右。
 与Hadoop、Hive、HBase等无缝连接:Spark可以直接访问Hadoop、Hive、Hbase等的数据,同时也可使用Hadoop的资源管理器。
 易用、通用、好用:Spark编程非常高效、简洁,支持多种语言的API,如Scala、Java、Python、R、SQL等,同时提供类似于shell的交互式开发环境REPL。

1.7小结

本章简单介绍了机器学习与大数据,与人工智能的关系。同时也介绍了机器学习的一些基本任务和如何选择合适算法等问题,在选择机器学习平台时,这里我们介绍了Spark这样一个大数据平台的集大成者,它有很多优势,而且也得到越来越多企业的青睐,当然也是我们这本书主题介绍的内容,下一章我们介绍如何构建一个Spark机器学习系统。

一、绪论:

R语言是什么?

R是一种适用于统计分析计算和图像处理的语言.受S语言和Scheme语言影响发展而来.早期R是基于S语言的一个GNU项目,所以也可以当作S语言的一种实现,通常用S语言编写的代码都可以不作任何修改的在R环境下运行。R的语法是来自Scheme.

R语言如何而来的?

R本来是由来自新西兰奥克兰大学的Ross Ihaka和Robert Gentleman

开发.[因两人名字都是以R 开头 所以也因此形象称为R]

二、基础知识:

1、数据结构(矩阵,数据框,向量,列表)

#定义一个向量并且使用class查看他的属性:

> d <-class(d)
[1] "numeric"

我们看到是数字型的。
#再定义一个向量:

> c<- class(c)
[1] "character"

由于4外面打了引号,所以向量属性是字符型。
我们可以把他转成数值型:

> class(c)<-"numeric"
> c
[1] 1 2 3 4

就变成数值了
试试下面的四则混合运算(疑问:2者长度不一样,怎么计算出来的?可以尝试观察下):
d+c
d-c
d*c
d/c
#定义一个矩阵

>M <-matrix(1:9,nrow=3,ncol=3)
>M
[,1] [,2] [,3]
[1,] 1 4 7
[2,] 2 5 8
[3,] 3 6 9

#按行填充

>N <-matrix(1:9,nrow=3,ncol=3,byrow=T)
>N
[,1] [,2] [,3]
[1,] 1 2 3
[2,] 4 5 6
[3,] 7 8 9

仔细观察下上面两者写法和结果的差别。
动手试试看下面的矩阵乘法:
M*N
M%*%N
#定义一个list

> D <-{}
> D[[1]]<-c(1,2,3)
> D[[2]]<-c(2,3,4)
> D[[3]]<-matrix(1:6,nrow=2)
> D
[[1]]
[1] 1 2 3

[[2]]
[1] 2 3 4

[[3]]
[,1] [,2] [,3]
[1,] 1 3 5
[2,] 2 4 6

#定义序列

> G1 <-seq(from=1,to=9,by=1 )
> G2 G3<-rep(c(1,2),2)
> G1
[1] 1 2 3 4 5 6 7 8 9
> G2
[1] 1 2 3 4 5 6 7 8 9
> G3
[1] 1 2 1 2

#读入数据
read.table("路径",header=FALSE,stringsAsFactors=TRUE,fill=TRUE)

2.Apply函数簇和长宽报表转换

#定义矩阵

> N<-matrix(1:9,nrow=3,ncol=3,byrow=T)
> N
[,1] [,2] [,3]
[1,] 1 2 3
[2,] 4 5 6
[3,] 7 8 9

动手试试看:
#按行求和
apply(N,1,sum)
#按列求均值
apply(N,2,mean)
#按列求标准差
apply(N,2,sd)
#按列排序
apply(N,2,sort)
#导入R里面的数据集iris
data(iris)
iris
#用apply函数簇,求每种花的花瓣总长度
tapply(iris$Sepal.Length,iris$Species,sum)
#求每种花瓣,花鄂的标准差
sapply(1:(ncol(iris)-1),function(x) sd(iris[,x]))
apply(iris,2,sd)

#长宽表转换(需要导入reshape和reshape2包)
#### source("http://bioconductor.org/biocLite.R")
#### biocLite()
####
library(reshape)
library(reshape2)
data_reshape 借款周期=c(7,10,14,8,10,12))
data2 data3 data3 colnames(data3) <-c("客户ID","借款金额","借款时间","借款周期")
结果如下:

1.TensorFlow简介
TensorFlow是谷歌基于DistBelief进行研发的第二代人工智能学习系统,采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即Tensor(张量),而Flow(流)意味着基于数据流图的计算,TensorFlow为张量从流图的一端流动到另一端计算过程。TensorFlow不只局限于神经网络,其数据流式图支持非常自由的算法表达,当然也可以轻松实现深度学习以外的机器学习算法。事实上,只要可以将计算表示成计算图的形式,就可以使用TensorFlow。TensorFlow可被用于语音识别或图像识别等多项机器深度学习领域,TensorFlow一大亮点是支持异构设备分布式计算,它能够在各个平台上自动运行模型,从手机、单个CPU / GPU到成百上千GPU卡组成的分布式系统。

2、TensorFlow安装
安装TensorFlow,因本环境的python3.6采用anaconda来安装,故这里采用conda管理工具来安装TensorFlow,目前conda缺省安装版本为TensorFlow1.2。

conda install tensorflow

验证安装是否成功,可以通过导入tensorflow来检验。
启动ipython(或python)

import tensorflow as tf

测试测试TensorFlow,Jupyter Notebook及matplotlib

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

##通知笔记本将matplotlib图表直接显示在浏览器上
%matplotlib inline

a=tf.random_normal([2,40]) ###随机生成一个2x40矩阵
sess=tf.Session() ####启动session,并赋给一个sess对象
out=sess.run(a) ###执行对象a,并将输出数组赋给out
x,y=out ###将out这个2x40矩阵划分为两个1x40的向量x,y
plt.scatter(x,y) ###利用scatter绘制散点图
plt.show()

3、TensorFlow的发展
2015年11月9日谷歌开源了人工智能系统TensorFlow,同时成为2015年最受关注的开源项目之一。TensorFlow的开源大大降低了深度学习在各个行业中的应用难度。TensorFlow的近期里程碑事件主要有:
2016年04月:发布了分布式TensorFlow的0.8版本,把DeepMind模型迁移到TensorFlow;
2016年06月:TensorFlow v0.9发布,改进了移动设备的支持;
2016年11月:TensorFlow开源一周年;
2017年2月:TensorFlow v1.0发布,增加了Java、Go的API,以及专用的编译器和调试工具,同时TensorFlow 1.0引入了一个高级API,包含f.layers,tf.metrics和tf.losses模块。还宣布增了一个新的tf.keras模块,它与另一个流行的高级神经网络库Keras完全兼容。
2017年4月:TensorFlow v1.1发布,为Windows 添加 Java API 支,添加tf.spectral 模块,Keras 2 API等;
2017年6月:TensorFlow v1.2发布,包括 API 的重要变化、contrib API的变化和Bug 修复及其它改变等。

4、TensorFlow的特点
高度的灵活性
TensorFlow 采用数据流图,用于数值计算的开源软件库。只要计算能表示为一个数据流图,你就可以使用Tensorflow。
真正的可移植性
Tensorflow 在CPU和GPU上运行,可以运行在台式机、服务器、云服务器、手机移动设备、Docker容器里等等。
将科研和产品联系在一起
过去如果要将科研中的机器学习想法用到产品中,需要大量的代码重写工作。Tensorflow将改变这一点。使用Tensorflow可以让应用型研究者将想法迅速运用到产品中,也可以 让学术性研究者更直接地彼此分享代码,产品团队则用Tensorflow来训练和使用计算模 型,并直接提供给在线用户,从而提高科研产出率。
自动求微分
基于梯度的机器学习算法会受益于Tensorflow自动求微分的能力。使用Tensorflow,只 需要定义预测模型的结构,将这个结构和目标函数(objective function)结合在一起,并添加数据,Tensorflow将自动为你计算相关的微分导数。
多语言支持
Tensorflow 有一个合理的c++使用界面,也有一个易用的python使用界面来构建和执行你的graphs。你可以直接写python/c++程序,也可以用交互式的Ipython界面来用Tensorflow尝试这些想法,也可以使用Go,Java,Lua,Javascript,或者是R等语言。
性能最优化
如果你有一个32个CPU内核、4个GPU显卡的工作站,想要将你工作站的计算潜能全发挥出来,由于Tensorflow 给予了线程、队列、异步操作等以最佳的支持,Tensorflow 让你可以将你手边硬件的计算潜能全部发挥出来。你可以自由地将Tensorflow图中的计 算元素分配到不同设备上,充分利用这些资源。下表为TensorFlow的一些主要技术特征:

5、TensorFlow的编程模式
TensorFlow是一个采用数据流图(Data Flow Graphs),节点(Nodes)在图中表示数学操作,图中的边(edges)则表示在节点间相互联系的任何维度的数据数组,即张量(tensor)。数据流图用结点(nodes)和边(edges)的有向无环图(DAG)来描述数学计算。节点一般用来表示施加的数学操作(tf.Operation),但也可以表示数据输入(feed in)的起点/输出(push out)的终点,或者是读取/写入持久变量(persistent variable)的终点。边表示节点之间的输入/输出关系。这些数据“边”可以输运“size可动态调整”的多维数据数组,即“张量”(tensor)。张量从图中流过的直观图像是这个工具取名为“Tensorflow”的原因。

Tensorflow中的计算可以表示为一个有向图(directed graph),或称计算图(computation graph),其中每一个运算操作将作为一个节点(node),节点与节点之间的连接成为边(edge),而在计算图的边中流动(flow)的数据被称为张量(tensor),所以形象的看整个操作就好像数据(tensor)在计算图(computation graphy)中沿着边(edge)流过(flow)一个个节点(node),这就是tensorflow名字的由来的。

saddle_point_evaluation_optimizers

计算图中的每个节点可以有任意多个输入和任意多个输出,每个节点描述了一种运算操作(operation, op),节点可以算作运算操作的实例化(instance)。计算图描述了数据的计算流程,它也负责维护和更新状态,用户可以对计算图的分支进行条件控制或循环操作。用户可以使用pyton、C++、Go、Java等语言设计计算图。tensorflow通过计算图将所有的运算操作全部运行在python外面,比如通过c++运行在cpu或通过cuda运行在gpu 上,所以实际上python只是一种接口,真正的核心计算过程还是在底层采用c++或cuda在cpu或gpu上运行。

一个 TensorFlow图描述了计算的过程. 为了进行计算, 图必须在会话(session)里被启动. 会话将图的op分发到诸如CPU或GPU之的备上, 同时提供执行op的方法. 这些方法执行后, 将产生的tensor返回. 在Python语言中, 返回的tensor是numpy ndarray对象; 在C和C++语言中, 返回的tensor是tensorflow::Tensor实例。

从上面的描述中我们可以看到,tensorflow的几个比较重要的概念:tensor, computation graphy, node, session。正如前面所说,整个操作就好像数据(tensor)在计算图(computation graphy)中沿着边(edge)流过(flow)一个个节点(node),然后通过会话(session)启动计算。所以简单来说,要完成这整个过程,我们需要的东西是要定义数据、计算图和计算图上的节点,以及启动计算的会话。所以在实际使用中我们要做的大部分工作应该就是定义这些内容了。

6、TensorFlow实例
TensorFlow如何工作?我们通过一个简单的实例进行说明,为计算x+y,你需要创建下图这张数据流图


以下构成上数据流图的详细步骤:
1)定义x= [1,3,5],y =[2,4,7],这个图和tf.Tensor一起工作来代表数据的单位,你需要创建恒定的张量:

import tensorflow as tf
x = tf.constant([1,3,5])
y = tf.constant([2,4,7])

2)定义操作

op = tf.add(x,y)

3)张量和操作都有了,接下来就是创建图

my_graph = tf.Graph()

这一步非必须,在创建回话时,系统将自动创建一个默认图。
4)为了运行这图你将需要创建一个回话(tf.Session),一个tf.Session对象封装了操作对象执行的环境,为了做到这一点,我们需要定义在会话中将要用到哪一张图:

with tf.Session(graph=my_graph) as sess:
x = tf.constant([1,3,5])
y = tf.constant([2,4,7])
op = tf.add(x,y)

5)想要执行这个操作,要用到tf.Session.run()这个方法:

import tensorflow as tf
my_graph = tf.Graph()
with tf.Session(graph=my_graph) as sess:
x = tf.constant([1,3,5])
y = tf.constant([2,4,7])
op = tf.add(x,y)
result = sess.run(fetches=op)
print(result)

6)运行结果:
[ 3 7 12]

7、几乎所有深度框架都是基于计算图,计算图可分为静态计算图和动态计算图。静态计算图,先定义再运行,一次定义多次运行;动态计算图在运行过程中被定义,在运行中构建,可以多次构建多次运行。Tensorflow属于静态图,先构造图形,然后创建sess,最后run。这里我们介绍深度学习另一种框架Pytorch,它属于动态图形,每步生成图的一部分,最后把图组合起来,形成一个完整图形,具体可参考下图。

saddle_point_evaluation_optimizers

以上动态图对应的代码为:

第一部分 Python基础
第1章、Numpy常用操作
第2章、Theano基础
第3章、Python知识图谱
第二部分 Python数据分析
第1章、Pandas基础
第2章、 操作数据库
第3章、数据处理
第4章、用Python分析股票数据
第5章、电信客户流失分析

第三部分 数据可视化

第四部分 爬虫
第1章、Scrapy基础
第2章、利用Python抓取并分析京东商品评论数据
第3章、Python 爬虫入门实例
第4章、 Scarpy 爬虫入门实例

第五部分 Web开发

第六部分 Python与机器学习

第1章、机器学习简介
第2章、机器学习一般流程
第3章、机器学习简单示例
第4章、Scikit-learn简介
第5章、数据探索和预处
第6章、模型评估与参数优化
第7章、集成学习
第8章、航空公司客户价值分析实例
第9章、自然语言处理----情感分析实例
第10章、聚类算法及实例
第11章、神奇的SVM及实例分析
第12章、化繁为简的高手----正则化方法
第13章、神经网络简介
第14章、由浅入深--轻松学会--用Python实现神经网络
第15章、K-means聚类
第16章、回归模型(分别用解方程、迭代、自动求导等方法求参数)
第17章、多种降维方法
第18章、Kaggle竞赛神器--集成学习
第19章、深度学习首先方法:Jupyter NoteBook
第20章、深度学习框架Pytorch快速入门
第21章、TensorFlow的又一利器TensorFlow.js(基于浏览器开发测试机器学习)
第22章、神经网络黑箱不黑
第23章、神经风格迁移
第24章、轻松掌握深度学习的核心--误差反向传播法
第25章、Pytorch如果高效使用GPU加速
第26章、用预训练模型清除图像中的雾霾

第七部分 Python与深度学习
###应用数学基础###
第1章 线性代数
第2章 概率与信息论
第3章 概率图模型
###深度学习理论与应用###
第4章 机器学习基础
4.1简介
4.2监督学习
4.3无监督学习
4.4神经网络
4.5随机梯度下降
4.6参数估计
4.7Sklearn简介

第5章 深度学习挑战及方案
5.1 梯度下降与最优化
第6章 安装TensorFlow
第7章 TensorFlow基础
第8章 TensorFlow图像处理
第9章 TensorFlow实现神经元函数
第10章 TensorFlow自编码
第11章 TensorFlow 实现word2vec
第12章 TensorFlow 卷积神经网络
第13章 TensorFlow 循环神经网络
第14章 TensorFlow 高级封装
###深度学习实战(使用TensorFlow实现)###
第15章 情感分析
第16章 自动生成文章摘要
第17章 聊天机器人
第18章 人脸识别
第19章 乳腺癌识别
第20章 信用卡欺诈检测
第21章 自然语言处理
第22章 强化学习
第23章 生成式对抗网络
第24章 语音识别基础
第25章 自动驾驶汽车--使汽车在两条白线之间行驶
第26章 自动驾驶汽车--交通标志识别

第八部分 爬虫+文件处理+科学计算+机器学习等

第1章 机器学习简介
1.1机器学习的定义
1.2大数据与机器学习
1.3 机器学习、人工智能及深度学习
1.4 机器学习的基本任务
1.5 如何选择合适算法
1.6 Spark在机器学习方面的优势
1.7 小结
第2章 构建Spark机器学习系统
2.1机器学习系统架构
2.2启动集群
2.3加载数据
2.4探索数据
2.4.1 数据统计信息
2.4.2 数据质量分析
2.4.3 数据特征分析
2.4.3.1 数据特征分析
2.4.3.2 特征分布及相关性分析
2.4.3.3 对比分析
2.4.4 数据的可视化
2.5数据预处理
2.5.1数据清理
2.5.2数据变换
2.5.3数据集成
2.5.4数据归约
2.6构建模型
2.7模型评估
2.8组装
2.9模型选择或调优
2.9.1 交叉验证(CrossValidator)
2.9.2训练-验证切分(TrainValidationSplit)
2.10保存模型
2.11小结
第3章 ML Pipelines
3.1 Pipeline简介
3.2DataFrame
3.3 Pipeline组件
3.4 Pipeline原理
3.5 Pipeline实例
3.5.1使用Estimator, Transformer, and Param实例
3.5.2 ML使用Pipeline实例
3.6 小结
第4章 特征提取、转换和选择
4.1 特征提取
4.1.1 词频-逆向文件频率(TF-IDF)
4.1.2 Word2Vec
4.1.3 计数向量器(Countvectorizer)
4.2 特征转换
4.2.1分词器(Tokenization)
4.2.2 移除停用词(StopWordsRemover)
4.2.3 n-gram
4.2.4 二值化
4.2.5 主成分分析(PCA)
4.2.6 多项式展开(PolynomialExpansion)
4.2.7 离散余弦变换(DCT)
4.2.8 字符串-索引变换(StringIndexer)
4.2.9 索引-字符串变换(IndexToString)
4.2.10 独热编码(OneHotEncoder)
4.2.11 向量-索引变换
4.2.12交互式(Interaction)
4.2.13 正则化(Normalizer)
4.2.14 规范化(StandardScaler)
4.2.15 最大值-最小值缩放(MinMaxScaler)
4.2.16 最大值-绝对值缩放(MaxAbsScaler)
4.2.17 离散化重组(Bucketizer)
4.2.18 元素乘积(ElementwiseProduct)
4.2.19 SQL转换器(SQLTransformer)
4.2.20 向量汇编(VectorAssembler)
4.2.21 分位数离散化(QuantileDiscretizer)
4.3 特征选择
4.3.1 向量机(VectorSlicer)
4.3.2 R公式(RFormula)
4.3.3 卡方特征选择(ChiSqSelector)
4.4 小结
第5章 模型选择和优化
5.1 模型选择
5.2交叉验证(cross-validation)
5.3训练验证拆分法(train validation split)
5.4自定义模型选择
5.5小结
第6章 Spark MLlib简介
6.1 Spark MLlib简介
6.2 Spark MLlib架构
6.3 数据类型
6.4 基础统计
6.4.1摘要统计(summary statistics)
6.4.2相关性(correlations)
6.4.3假设检验(hypothesis testing)
6.4.4随机数据生成(random data generation)
6.5 RDD、Dataframe和Dataset
6.5.1 RDD
6.5.2Dataset/DataFrame
6.5.3相互转换
6.6 小结
第7章 构建Spark ML推荐模型
7.1 推荐模型简介
7.2 数据加载
7.3 数据探索
7.4 训练模型
7.5 组装
7.6 评估模型
7.7 模型优化
7.8小结
第8章构建Spark ML 分类模型
8.1分类模型简介
8.1.1线性模型
8.1.2 决策树模型
8.1.3 朴素贝叶斯模型
8.2数据加载
8.3 数据探索
8.4数据预处理
8.5组装
8.6模型优化
8.7小结
第9章 构建Spark ML回归模型
9.1 回归模型简介
9.2 数据加载
9.3 探索特征分布
9.4 数据预处理
9.4.1 特征选择
9.4.2 特征转换
9.5 组装
9.6 模型优化
9.7 小结
第10章 构建Spark ML聚类模型
10.1 K-means模型简介
10.2 数据加载
10.3 探索特征的相关性
10.4 数据预处理
10.5 组装
10.6 模型优化
10.7 小结
第11章 PySpark 决策树模型
11.1 PySpark 简介
11.2 决策树简介
11.3数据加载
11.3.1 原数据集初探
11.3.2 PySpark 的启动
11.3.3 基本函数
11.4数据探索
11.5数据预处理
11.6创建决策树模型
11.7训练模型进行预测
11.8模型优化
11.8.1特征值的优化
11.8.2交叉验证和网格参数
11.9脚本方式运行
11.9.1 在脚本中添加配置信息
11.9.2运行脚本程序
11.10小结
第12章 Spark R 朴素贝叶斯模型
12.1. Spark R简介
12.2. 获取数据
12.2.1. SparkDataFrame数据结构说明
12.2.2. 创建Spark DataFrame
12.2.3. SparkDataFrame的常用操作
12.3. 朴素贝叶斯分类器
12.3.1数据探查
12.3.2对原始数据集进行转换
12.3.3查看不同船舱的生还率差异
12.3.4转换成Spark DataFrame格式的数据
12.3.4模型概要
12.3.5预测
12.3.6评估模型
12.4 小结
第13章 使用Spark Streaming构建在线学习模型
13.1 Spark Streaming简介
13.1.1Spark Streaming常用术语
13.1.2Spark Streaming处理流程
13.2 Dstream操作
13.2.1 Dstream输入
13.2.2 Dstream转换
13.2.3 Dstream修改
13.2 .4Dstream输出
13.3 Spark Streaming应用实例
13.4 Spark Streaming在线学习实例
13.5小结
第14章 TensorFlowOnSpark简介
14.1TensorFlow简介
14.1.1TensorFlow的安装
14.1.2TensorFlow的发展
14.1.3TensorFlow的特点
14.1.4TensorFlow编程模型
14.1.5TensorFlow常用函数
14.1.6TensorFlow的运行原理
14.1.7TensorFlow系统架构
14.2TensorFlow实现卷积神经网络
14.2.1卷积神经网络简介
14.2.2卷积神经网络的发展历程
14.2.3卷积神经网络的网络结构
14.2.4TensorFlow实现卷积神经网络
14.3TensorFlow实现循环神经网络
14.3.1循环神经网络简介
14.3.2 LSTM循环神经网络简介
14.3.3 LSTM循环神经网络分步说明
14.3.4TensorFlow实现循环神经网络
14.4分布式TensorFlow
14.4.1客户端、主节点和工作节点间的关系
14.4.2分布式模式
14.4.3在Pyspark集群环境运行TensorFlow
14.5TensorFlowOnSpark架构
14.6TensorFlowOnSpark安装
14.7TensorFlowOnSpark实例
14.7.1TensorFlowOnSpark单机模式实例
14.7.2TensorFlowOnSpark集群模式实例
14.8小结

第1章 为什么要自己动手做大数据系统(略)
第2章 项目背景及准备(略)

项目总体框架图,从数据获取->数据存储->数据处理->数据分析与展示
第3章 大数据环境搭建和配置
3.1 各组件功能说明
3.1.1 各种数据源的采集工具
3.1.2 企业大数据存储工具
3.1.3 企业大数据系统的数据仓库工具
3.1.4 企业大数据系统的分析计算工具
3.1.5 企业大数据系统的数据库工具
3.2 大数据系统各组件安装部署配置
3.2.1 安装的前期准备工作
3.2.2 Hadoop基础环境安装及配置
3.2.3 Hive安装及配置
3.2.4 Sqoop安装及配置
3.2.5 Spark安装及配置
3.2.6 Zookeeper安装及配置
3.2.7 HBase安装及配置
3.3 自动化安装及部署说明
3.3.1 自动化安装及部署整体架构设计
3.3.2 大数据系统自动化部署逻辑调用关系
3.4 本章小结
第4章 大数据的获取
4.1 使用爬虫获取互联网数据
4.2 Python和Scrapy 框架的安装
4.3 抓取和解析招聘职位信息
4.4 职位信息的落地
4.5 两个爬虫配合工作
4.6 让爬虫的架构设计更加合理
4.7 获取数据的其他方式
4.8 使用Sqoop同步论坛中帖子数据
4.9 本章小结
第5章 大数据的处理
5.1 Hive是什么
5.2 为什么使用Hive做数据仓库建模
5.3 飞谷项目中Hive建模步骤
5.3.1 逻辑模型的创建
5.3.2 物理模型的创建
5.3.3 将爬虫数据导入stg_job表
5.4 使用Hive进行数据清洗转换
5.5 数据清洗转换的必要性
5.6 使用HiveQL清洗数据、提取维度信息
5.6.1 使用HQL清洗数据
5.6.2 提取维度信息
5.7 定义Hive UDF封装处理逻辑
5.7.1 Hive UDF的开发、部署和调用
5.7.2 Python版本的UDF
5.8 使用左外连接构造聚合表rpt_job
5.9 让数据处理自动调度
5.9.1 HQL的几种执行方式
5.9.2 Hive Thrift服务
5.9.3 使用JDBC连接Hive
5.9.4 Python调用HiveServer服务
5.9.5 用crontab实现的任务调度
5.10 本章小结
第6章 大数据的存储
6.1 NoSQL及HBase简介
6.2 HBase中的主要概念
6.3 HBase客户端及JavaAPI
6.4 Hive数据导入HBase的两种方案
6.4.1 利用既有的JAR包实现整合
6.4.2 手动编写MapReduce程序
6.5 使用Java API查询HBase中的职位信息
6.5.1 为什么是HBase而非Hive
6.5.2 多条件组合查询HBase中的职位信息
6.6 如何显示职位表中的某条具体信息
6.7 本章小结
第7章 大数据的展示
7.1 概述
7.2 数据分析的一般步骤
7.3 用R来做数据分析展示
7.3.1 在Ubuntu上安装R
7.3.2 R的基本使用方式
7.4 用Hive充当R的数据来源
7.4.1 RHive组件
7.4.2 把R图表整合到Web页面中
7.5 本章小结
第8章 大数据的分析挖掘
8.1 基于Spark的数据挖掘技术
8.2 Spark和Hadoop的关系
8.3 在Ubuntu上安装Spark集群
8.3.1 JDK和Hadoop的安装
8.3.2 安装Scala
8.3.3 安装Spark
8.4 Spark的运行方式
8.5 使用Spark替代Hadoop Yarn引擎
8.5.1 使用spark-sql查看Hive表
8.5.2 在beeline客户端使用Spark引擎
8.5.3 在Java代码中引用Spark的ThriftServer
8.6 对招聘公司名称做全文检索
8.6.1 从HDFS数据源构造JavaRDD
8.6.2 使用Spark SQL操作RDD
8.6.3 把RDD运行结果展现在前端
8.7 如何把Spark用得更好
8.8 SparkR组件的使用
8.8.1 SparkR的安装及启动
8.8.2 运行自带的Sample例子
8.8.3 利用SparkR生成职位统计饼图
8.9 本章小结