年度归档:2017年

Theano是Python的一个库,为开源项目,在2008年,由Yoshua Bengio领导的加拿大蒙特利尔理工学院LISA实验室开发。对于解决大量数据的问题,使用Theano可能获得与手工用C实现差不多的性能。另外通过利用GPU,它能获得比CPU上的快很多数量级的性能。
Theano把计算机代数系统(CAS)和优化的编译器结合在一起。 它也可以对许多数学操作生成自定义的C代码。这种CAS和优化编译的组合对于有复杂数学表达式重复的被求值并且求值速度很关键的问题是非常有用的。对于许多不同的表达式只求值一次的场景,Theano也能最小化编译/分析的次数,但是仍然可以提供诸如自动差分这样的符号计算的特性。
Theano主要特点:
第一个特点:
使用“符号计算图”来描述模型表达式的开源架构,当前很多优秀的开源工具库或深度学习框架,如TensorFlow、Keras等,都借鉴了Theano的设计风格及其底层设计,因此,了解Theano的特点,尤其是符合图的机制对我们学好其它开源工具非常有帮助。
第二个特点:Theano针对多维数组(或张量),能够高效实现、编译和评估数学表达式,它同时还可以使得代码在GPU上执行。它没有专门提供深度学习相关的API,因此,用户构建模型时需要从最基本的网络层开始构建。
1、符号变量
Theano的变量类型称为符号变量,用TensorVariable表示,又称为张量,它是Theano表达式和运算操作的基本单位。创建符号变量的方式有如下几种:
 使用内置的变量类型创建
目前Theano支持7种内置的变量类型,分别是标量(scalar)、向量(vector)、行(row)、列(col)、矩阵(matrix)、tensor3、tensor4等。其中标量是0阶张量,向量为1阶张量,矩阵为二阶张量等,以下为创建内置变量的实例:

import theano
from theano import tensor as T
data=T.vector(name='data',dtype='float64')

其中,
name指定变量名字
dtype指变量的数据类型。
以下我们通过Theano的tensor模块中scalar来计算一维数据样本点x的输入z,权重假设为w,偏移量为b,即它们间的关系为:
z=w*x+b
实现这个表达式的代码如下:

import theano
from theano import tensor as T
#初始化
x=T.scalar()
w=T.scalar()
b=T.scalar()

z=w*x+b
#编译
net_input=theano.function(inputs=[w,x,b],outputs=z)
#执行
print('net input: %.2f' %net_input(3.0,2.0,0.1))

运算结果为:net input: 6.10
 自定义变量类型
内置的变量类型只能处理4维及以下的变量,如果需要处理更高维的数据时,我们可以使用Theano的自定义变量类型,具体通过TensorType方法来实现:

import theano
from theano import tensor as T

mytype=T.TensorType('float64',broadcastable=(),name=None,sparse_grad=False)

其中broadcastable是True或False的布尔类型元组,元组的大小等于变量的维度,如果为True,表示变量在对应维度上的数据可以进行广播,否则数据不能广播。
广播机制(broadcast)是一种重要机制,有了这种机制,就可以方便对不同维的张量进行运算,否则,就要手工把低维数据变成高维,利用广播机制系统自动利用复制等方法把低维数据补齐,numpy也有这种机制。以下我们通过一个实例来说明广播机制原理:

上图矩阵与向量相加的具体代码如下:

import theano
import numpy as np
import theano.tensor as T
r = T.row()
r.broadcastable
# (True, False)

mtr = T.matrix()
mtr.broadcastable
# (False, False)

f_row = theano.function([r, mtr], [r + mtr])
R = np.arange(1,3).reshape(1,2)
R
#array([[1, 2]])

M = numpy.arange(1,7).reshape(3, 2)
M
#array([[1, 2],
# [3, 4],
# [5, 6]])

f_row(R, M)
#[array([[ 2., 4.],
# [ 4., 6.],
# [ 6., 8.]])]

 将Python类型变量或者Numpy类型变量转化为Theano共享变量
共享变量是Theano实现变量更新的重要机制,后面我们会详细讲解。要创建一个共享变量,只要把一个Python对象或Numpy对象传递给shared函数即可。

import theano
import numpy as np
import theano.tensor as T

data=np.array([[1,2],[3,4]])
shared_data=theano.shared(data)
type(shared_data)

2.符号计算图模型
要定义一个符号表达式,首先创建表达式所需的变量,然后通过操作符(op)把这些变量结合在一起。
Theano处理符号表达式时通过把符号表达式转换为一个计算图(graph)来处理(TensorFlow也使用了这种方法,等到我们介绍TensorFlow时,大家可对比一下),符号计算图的节点有:variable、type、apply和op
variable节点:即符号的变量节点,符号变量是符号表达式存放信息的数据结构,可以分为输入符号和输出符号。
type节点:当定义了一种具体的变量类型以及变量的数据类型时,Theano为其指定数据存储的限制条件。
apply节点:把某一种类型的符号操作符应用到具体的符号变量中,与variable不同,apply节点无须由用户指定,一个apply节点包括3个字段:op、inputs、outputs。
op节点:即操作符节点,定义了一种符号变量间的运算,如+、-、sum()、tanh()等。
Theano是将符号表达式的计算表示成graphs。这些graphs是由将Apply 和 Variable节点内连接而组成的,它们是分别与函数的应用和数据相连接的。 操作是由 Op 实例表示,而数据类型是由 Type 实例表示。下面有一段代码和一个图表,该图表用来说明由这些代码所构建的结构。借助这个图或许有助于您理解如何将这些片拟合到一起:

import theano
import numpy as np
import theano.tensor as T

x = T.dmatrix('x')
y = T.dmatrix('y')
z = x + y

图中箭头表示指向python对象的引用。这里的蓝色盒子是一个 Apply 节点。红色盒子是 Variable 节点。绿色圆圈是Ops。紫色盒子是 Types。
在创建 Variables 之后,对它们应用 Apply Ops 从而得到更多的变量,并得到一个二分、有向、无环图。变量指向 Apply 节点的过程是用来表示函数通过它们的owner 域来生成它们 。这些Apply节点是通过它们的inputs和outputs域来得到它们的输入和输出变量的。
x 和 y 的owner 域的指向都是None是因为它们不是另一个计算的结果。如果它们中的一个是另一个计算的结果,那么owner域将会指向另一个的蓝色盒。
3.函数
函数是Theano的一个核心设计模块,我们了解了Theano如何把一个符号表达式转化为符号计算图,函数的功能则是提供一个接口,把函数计算图编译为可调用的函数对象。
3.1 Theano的编程风格
在theano中,我们一般是先声明自变量x(不需要赋值),然后编写函数方程结束后;最后在为自变量赋值,计算出函数的输出值y。如下示例:

import theano

#声明一个int类型的变量x
x=theano.tensor.iscalar('x')
#定义y=x^3
y=theano.tensor.pow(x,3)
#定义函数的自变量为x(输入),因变量为y(输出)
f=theano.function([x],y)
#计算当x=2的时候,函数f(x)的值
print (f(2))
#计算当x=4时,函数f(x)=x^3的值
print (f(4))
8
64

一旦我们定义了f(x)=x^3,这个时候,我们就可以输入我们想要的x值,然后计算出x的三次方。

3.2 函数的定义
Theano函数定义语法为:
theano.function(inputs, outputs, mode=None, updates=None, givens=None, no_default_updates=False, accept_inplace=False, name=None,rebuild_strict=True, allow_input_downcast=None, profile=None, on_unused_input='raise')。
参数看起来很多,但常用的一般只用到三个,inputs表示自变量、outputs表示函数的因变量(也就是函数的返回值),还有另外一个比较常用的是updates这个参数,这个一般用于神经网络共享变量参数更新,通常以字典或元组列表的形式指定;givens是一个字典或元组列表,记为[(var1,var2)],表示在每一次函数调用时,在符号计算图中,把符号变量var1节点替换为var2节点,该参数常用来指定训练数据集的batch大小。
我们看一个有多个自变量、同时又有多个因变量的函数定义例子:

import theano
x, y =theano.tensor.fscalars('x', 'y')
z1= x + y
z2=x*y
f =theano.function([x,y],[z1,z2])#定义x、y为自变量,z1、z2为函数返回值(因变量)
print(f(2,3))#返回当x=2,y=3的时候,函数f的因变量z1,z2的值
[array(5.0, dtype=float32), array(6.0, dtype=float32)]

3.3 重要函数
Theano有个很好用的函数,就是求函数的偏导数theano.grad(),比如下面S函数:

我们要求当x=3的时候,s函数的导数,代码如下:

import theano
x =theano.tensor.fscalar('x')#定义一个float类型的变量x
y= 1 / (1 + theano.tensor.exp(-x))#定义变量y
dx=theano.grad(y,x)#偏导数函数
f= theano.function([x],dx)#定义函数f,输入为x,输出为s函数的偏导数
print(f(3))#计算当x=3的时候,函数y的偏导数
0.045176658779382706

3.4 更新共享变量参数
在theano.function函数中,有个非常重要的参数updates,updates是一个包含两个元素的列表或tuple,updates=[old_w,new_w],当函数被调用的时候,这个会用new_w替换old_w,具体看下面这个例子

import theano
w= theano.shared(1)#定义一个共享变量w,其初始值为1
x=theano.tensor.iscalar('x')
f=theano.function([x], w, updates=[[w, w+x]])#定义函数自变量为x,因变量为w,当函数执行完毕后,更新参数w=w+x
print(f(3))#函数输出为w
print(w.get_value())#这个时候可以看到w=w+x为4
1
4

在求梯度下降的时候,经常用到updates这个参数。比如updates=[w,w-α*(dT/dw)],其中dT/dw就是我们梯度下降的时候,损失函数对参数w的偏导数,α是学习率。为便于大家的更全面的了解Theano函数一些使用,下面我们通过一个逻辑回归的完整实例来说明:

import numpy as np
import theano
import theano.tensor as T
rng = np.random

# 我们为了测试,自己生成10个样本,每个样本是3维的向量,然后用于训练
N = 10
feats = 3
D = (rng.randn(N, feats).astype(np.float32), rng.randint(size=N, low=0, high=2).astype(np.float32))

# 声明自变量x、以及每个样本对应的标签y(训练标签)
x = T.matrix("x")
y = T.vector("y")

#随机初始化参数w、b=0,为共享变量
w = theano.shared(rng.randn(feats), name="w")
b = theano.shared(0., name="b")

#构造损失函数
p_1 = 1 / (1 + T.exp(-T.dot(x, w) - b)) # s激活函数
xent = -y * T.log(p_1) - (1-y) * T.log(1-p_1) # 交叉商损失函数
cost = xent.mean() + 0.01 * (w ** 2).sum()# 损失函数的平均值+L2正则项以防过拟合,其中权重衰减系数为0.01
gw, gb = T.grad(cost, [w, b]) #对总损失函数求参数的偏导数

prediction = p_1 > 0.5 # 大于0.5预测值为1,否则为0.

train = theano.function(inputs=[x,y],outputs=[prediction, xent],updates=((w, w - 0.1 * gw), (b, b - 0.1 * gb)))#训练所需函数
predict = theano.function(inputs=[x], outputs=prediction)#测试阶段函数

#训练
training_steps = 1000
for i in range(training_steps):
pred, err = train(D[0], D[1])
print (err.mean())#查看损失函数下降变化过程

4、共享变量
共享变量是实现机器学习算法参数更新的重要机制。对于一般的符号变量,没有赋予初始值。在编写深度学习程序时,需要对权重参数进行初始化,此时需要带有初始值的符号变量,这种带有初始值的符号变量称为共享变量,共享变量的初始值由Numpy数据指定。
创建共享变量有两种模型:深拷贝和浅拷贝。深拷贝复制Numpy数组,浅拷贝只是复制Numpy的指针,后续对Numpy的修改将影响浅拷贝的共享变量值。这两种模式通过borrow参数来设置,默认borrow=False(即深拷贝),下面我们通过一个实例来说明:

import numpy as np
import theano
np_array=np.ones(2,dtype='float32')
s_default=theano.shared(np_array)
s_f=theano.shared(np_array,borrow=False)
s_t=theano.shared(np_array,borrow=True)
np_array的值为:array([ 1., 1.], dtype=float32)

下面我们修改np_array的值,查看对不同模式的共享变量影响。

np_array+=2
np_array
# array([ 3., 3.], dtype=float32)
s_default.get_value()
# array([ 1., 1.], dtype=float32)
s_f. get_value()
# array([ 1., 1.], dtype=float32)
s_t. get_value()
# array([ 3., 3.], dtype=float32)

通过以上代码运行结果可以看到,修改numpy的np_array值,对深拷贝的共享变量s_default,s_f变量没有影响,对s_t共享变量有影响。
5、配置Theano
现在maeOS、Linux或windows,大都使用64位内存寻址方式。Numpy和Theano默认情况下,也都使用双精度浮点格式(float64),但是,如果想要使用GPU加速计算,一般依赖32位的内存寻址方式,这是目前Theano唯一支持的计算框架。CPU在32位和64位都可以。
修改Theano的配置,可以通过以下两种方法,优先级从高到底:
(1)通过设置THEANO_FLAGS环境变量;
(2)通过.theanorc文件来设置。
5.1通过THEANO_FLAGS配置
通过设置THEANO_FLAGS环境变量来修改Theano的配置,这种方式可以是全局的,也可以是针对某个脚本文件。THEANO_FLAGS以字符串的形式显示,字符串以逗号分隔,如下所示:
THEANO_FLAGS='floatX=float32,device=gpu0'
具体修改可以直接修改环境变量或在命令行下修改或在脚本中修改。在脚本中修改时,需要在导入theano之前修改,否则可能导致修改无效。如下示例:

import os
os.environ["THEANO_FLAGS"]= 'floatX=float32,device=cpu'
import theano
print(theano.config.floatX)
float64

5.2通过.theanorc文件配置
.theanorc文件一般在$HOME目录下,编辑这个文件添加如下内容即可:

[global]
device=cpu
floatX=float64

1、 数据可视化
无论是大数据、还是小数据、也不管通过统计还是挖掘或机器学习,人们最终想看到的数据,越直观越好,所以这个就涉及到一个数据的可视化问题,而python或pandas的数据可视化功能很强大,可画的种类多,也非常便捷,这是一般数据库软件和开发工具目前所欠缺的。以下我们通过两个实例来说明利用python的matplotlib或pandas实现数据的可视化。
下例利用matplotlib实现数据的可视化

In [1]: %paste
# -*- coding: utf-8 -*-
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif']=['SimHei'] ###显示中文
plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上的-号变为方块
x = np.linspace(0, 10, 100)
y = np.sin(x)
y1 = np.cos(x)
##绘制一个图,长为10,宽为6(默认值是每个单位80像素)
plt.figure(figsize=(10,6))
###在图列中自动显示$间内容
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
plt.plot(x,y1,"b--",label="$cos(x^2)$") ###b(blue),--线形
plt.xlabel(u"X值") ##X坐标名称,u表示unicode编码
plt.ylabel(u"Y值")
plt.title(u"三角函数图像") ##t图名称
plt.ylim(-1.2,1.2) ##y上的max、min值
plt.legend() ##显示图例
plt.savefig('fig01.png') ##保持到当前目录
plt.show()

以下是运行结果:


(图 1-1 matplotlib数据可视化)

2、数据地图

下例通过matplotlib及mpl_toolkits.basemap实现地图数据的可视化。

# -*- coding: utf-8 -*-

from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as plt
import numpy as np

plt.rcParams['font.sans-serif']=['SimHei']
#============================================# read data
names = []
pops = []
lats = []
lons = []
countries = []
for line in file("/home/hadoop/data/bigdata_map/china_city_jobs_stat.csv"):
info = line.split(',')
names.append(info[0])
pops.append(float(info[1]))
lat = float(info[2][:-1])
if info[2][-1] == 'S': lat = -lat
lats.append(lat)
lon = float(info[3][:-1])
if info[3][-1] == 'W': lon = -lon + 360.0
lons.append(lon)
country = info[4]
countries.append(country)

#============================================
#lat0 = 35;lon0 = 120;change = 25;
lat0 = 30;lon0 = 120;change = 26;
lllat=lat0-change; urlat=lat0+change;
lllon=lon0-change; urlon=lon0+change;

map = Basemap(ax=None,projection='stere',lon_0=(urlon + lllon) / 2,lat_0=(urlat + lllat) / 2,llcrnrlat=lllat, urcrnrlat=urlat,llcrnrlon=lllon,urcrnrlon=urlon,resolution='f')
# draw coastlines, country boundaries, fill continents.
map.drawcoastlines(linewidth=0.25)
map.drawcountries(linewidth=0.25)
# draw the edge of the map projection region (the projection limb)
map.drawmapboundary(fill_color='#689CD2')
# draw lat/lon grid lines every 30 degrees.
#map.drawmeridians(np.arange(0,360,30))
#map.drawparallels(np.arange(-90,90,30))
# Fill continent wit a different color
map.fillcontinents(color='green',lake_color='#689CD2',zorder=0)
# compute native map projection coordinates of lat/lon grid.
shapefilepath = '/home/hadoop/data/bigdata_map/map/map'
map.readshapefile(shapefilepath,'city') #添加街道数据
x, y = map(lons, lats)

max_pop = max(pops)
# Plot each city in a loop.
# Set some parameters
size_factor = 80.0
y_offset = 15.0
rotation = 30
for i,j,k,name in zip(x,y,pops,names):
size = size_factor*k/max_pop
cs = map.scatter(i,j,s=size,marker='o',color='yellow')
plt.text(i,j+y_offset,name,rotation=rotation,fontsize=1)

plt.title(u'中国大数据主要城市需求分布图(2017-03-17)')
plt.show()

运行结果如下图:


(图1-2中国大数据需求分别图)

【几点说明】
为了在图形中显示中文,做了如下处理:
(1)、把simhei.ttf文件上传到如下目录:
~/ anaconda2/lib/python2.7/site-packages/matplotlib/mpl-data/fonts/ttf
(2)、删除~/.cache/matplotlib下的缓存文件 ##删除matplotlib使用缺省字体
(3)、然后配置:

plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上负号(-)变为小正方形。

3、下例为使用pandas的plot函数画图非常简便。

df=DataFrame(np.random.randn(10,4).cumsum(0),columns=['A','B','C','D'], index=np.arange(0,100,10))
df.plot()

(图1-3使用pandas的plot函数画图)

第1章 pandas基础

1.1 pandas简介

“Pandas经过几个版本的更新,目前已经成为数据清洗、处理和分析的不二选择。”
前面我们介绍了NumPy,它提供了数据处理功能,但需要写很多命令,是否有更加便捷、直观、有效的方法?Pandas就是为此而诞生的。Pandas提供了众多更高级、更直观的数据处理功能,尤其是它的DataFrame数据结构,将给你全新的体验,可以用处理数据库表或电子表格的方式来处理分析数据。
Pandas基于NumPy构建的,它提供的结构或工具,让以NumPy为中心的数据处理、数据分析变得更加简单和高效。
Pandas中两个最常用的对象是Series和DataFrame。使用pandas前,需导入以下内容:

In [1]: import numpy as np
In [2]: from pandas import Series,DataFrame
In [3]: import pandas as pd

1.2 pandas数据结构

Pandas主要采用Series和DataFrame两种数据结构。Series是一种类似一维数据的数据结构,由数据(values)及索引(indexs)组成,而DataFrame是一个表格型的数据结构,它有一组有序列,每列的数据可以为不同类型(NumPy数据组中数据要求为相同类型),它既有行索引,也有列索引。

a1=np.array([1,2,3,4])
a2=np.array([5,6,7,8])
a3=np.array(['a','b','c','d'])
df=pd.DataFrame({'a':a1,'b':a2,'c':a3})
print df

1.3 Series

上章节我们介绍了多维数组(ndarray),当然,它也包括一维数组,Series类似一维数组,为啥还要介绍Series呢?或Series有哪些特点?
Series一个最大特点就是可以使用标签索引,序列及ndarray也有索引,但都是位置索引或整数索引,这种索引有很多局限性,如根据某个有意义标签找对应值?切片时采用类似[2:3]的方法,只能取索引为2这个元素等等,无法精确定位。
Series的标签索引(它位置索引自然保留)使用起来就方便多了,而且定位也更精确,不会产生歧义。举例说明。

In [1]: import numpy as np
In [2]: from pandas import Series,DataFrame
In [3]: import pandas as pd

In [4]: s1=Series([1,3,6,-1,2,8])

In [5]: s1
Out[5]:
0 1
1 3
2 6
3 -1
4 2
5 8
dtype: int64

In [6]: s1.values
Out[6]: array([ 1, 3, 6, -1, 2, 8])

In [7]: s1.index
Out[7]: RangeIndex(start=0, stop=6, step=1)
###创建Series时,自定义索引或称为标签索引
In [8]: s2=Series([1,3,6,-1,2,8],index=['a','c','d','e','b','g'])

In [9]: s2
Out[9]:
a 1
c 3
d 6
e -1
b 2
g 8
dtype: int64

In [10]: s2['a'] ###根据标签索引找对应值
Out[10]: 1
In [11]: s2[['a','e']] ###根据标签索引找对应值
Out[11]:
a 1
e -1
dtype: int64

当然,Series除了标签索引外,还有其它很多优点,如运算的简洁:

In [15]: s2[s2>1]
Out[15]:
c 3
d 6
b 2
g 8
dtype: int64

In [16]: s2*10
Out[16]:
a 10
c 30
d 60
e -10
b 20
g 80
dtype: int64

1.4 DataFrame

DataFrame除了索引有位置索引也有标签索引,而且其数据组织方式与MySQL的表极为相似,除了形式相似,很多操作也类似,这就给我们操作DataFrame带来极大方便。这些是DataFrame特色的一小部分,它还有比数据库表更强大的功能,如强大统计、可视化等等。
DataFrame几要素:index、columns、values等,columns就像数据库表的列表,index是索引,当然values就是值了。

In [18]:
####自动生成一个3行4列的DataFrame,并定义其索引(如果不指定,缺省为整数索引)####及列名
d1=DataFrame(np.arange(12).reshape((3,4)),index=['a','b','c'],columns=['a1','a2','a3','a4'])

In [19]: d1
Out[19]:
a1 a2 a3 a4
a 0 1 2 3
b 4 5 6 7
c 8 9 10 11

In [20]: d1.index ##显示索引
Out[20]: Index([u'a', u'b', u'c'], dtype='object')

In [21]: d1.columns ##显示列名
Out[21]: Index([u'a1', u'a2', u'a3', u'a4'], dtype='object')

In [22]: d1.values ##显示值
Out[22]:
array([[ 0, 1, 2, 3],
[ 4, 5, 6, 7],
[ 8, 9, 10, 11]])

1.4.1 生成DataFrame

生成DataFrame有很多,比较常用的有导入等长列表、字典、numpy数组、数据文件等。

In [33]: data={'name':['zhanghua','liuting','gaofei','hedong'],'age':[40,45,50,46],'addr':['jianxi','pudong','beijing','xian']}

In [34]: d2=DataFrame(data)

In [35]: d2
Out[35]:
addr age name
0 jianxi 40 zhanghua
1 pudong 45 liuting
2 beijing 50 gaofei
3 xian 46 hedong

In [36]: d3=DataFrame(data,columns=['name','age','addr'],index=['a','b','c','d'])

In [37]: d3
Out[37]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian

1.4.2 获取数据

获取DataFrame结构中数据可以采用obj[]操作、查询、obj.ix[]等命令。

In [8]: data={'name':['zhanghua','liuting','gaofei','hedong'],'age':[40,45,50,46],'addr':['jianxi','pudong','beijing','xian']}
###把字典数据转换为DataFrame,并指定索引
In [9]: d3=DataFrame(data,columns=['name','age','addr'],index=['a','b','c','d'])

In [10]: d3
Out[10]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian

In [11]: d3[['name','age']] ##选择列
Out[11]:
name age
a zhanghua 40
b liuting 45
c gaofei 50
d hedong 46

In [12]: d3['a':'c'] ##选择行
Out[12]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing

In [13]: d3[1:3] ##选择行(利用位置索引)
Out[13]:
name age addr
b liuting 45 pudong
c gaofei 50 beijing
In [14]: d3[d3['age']>40] ###使用过滤条件
Out[14]:
name age addr
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian
obj.ix[indexs,[columns]]可以根据列或索引同时进行过滤,具体请看下例:
In [16]: d3.ix[['a','c'],['name','age']]
Out[16]:
name age
a zhanghua 40
c gaofei 50

In [17]: d3.ix['a':'c',['name','age']]
Out[17]:
name age
a zhanghua 40
b liuting 45
c gaofei 50

In [18]: d3.ix[0:3,['name','age']]
Out[18]:
name age
a zhanghua 40
b liuting 45
c gaofei 50

1.4.3 修改数据

我们可以像操作数据库表一样操作DataFrame,删除数据,插入数据、修改字段名、索引名、修改数据等,以下通过一些实例来说明。

In [9]: data={'name':['zhanghua','liuting','gaofei','hedong'],'age':[40,45,50,46],'addr':['jianxi','pudong','beijing','xian']}

In [10]: d3=DataFrame(data,columns=['name','age','addr'],index=['a','b','c','d'])

In [11]: d3
Out[11]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian

In [12]: d3.drop('d',axis=0) ###删除行,如果欲删除列,使axis=1即可
Out[12]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
In [13]: d3 ###从副本中删除,原数据没有被删除
Out[13]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian
###添加一行,注意需要ignore_index=True,否则会报错
In [14]: d3.append({'name':'wangkuan','age':38,'addr':'henan'},ignore_index=True)
Out[14]:
name age addr
0 zhanghua 40 jianxi
1 liuting 45 pudong
2 gaofei 50 beijing
3 hedong 46 xian
4 wangkuan 38 henan

In [15]: d3 ###原数据未变
Out[15]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian
###添加一行,并创建一个新DataFrame
In [16]: d4=d3.append({'name':'wangkuan','age':38,'addr':'henan'},ignore_index=True)

In [17]: d4
Out[17]:
name age addr
0 zhanghua 40 jianxi
1 liuting 45 pudong
2 gaofei 50 beijing
3 hedong 46 xian
4 wangkuan 38 henan

In [18]: d4.index=['a','b','c','d','e'] ###修改d4的索引

In [19]: d4
Out[19]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian
e wangkuan 38 henan
In [20]: d4.ix['e','age']=39 ###修改索引为e列名为age的值

In [21]: d4
Out[21]:
name age addr
a zhanghua 40 jianxi
b liuting 45 pudong
c gaofei 50 beijing
d hedong 46 xian
e wangkuan 39 henan

1.4.4 汇总统计

Pandas有一组常用的统计方法,可以根据不同轴方向进行统计,当然也可按不同的列或行进行统计,非常方便。
常用的统计方法有:


(表1-1 Pandas统计方法)
统计方法 说明
count 统计非NA的数量
describe 统计列的汇总信息
min、max 计算最小值和最大值
sum 求总和
mean 求平均数
var 样本的方差
std 样本的标准差
以下通过实例来说明这些方法的使用

from pandas import DataFrame
import numpy as np
import pandas as pd
inputfile = '/home/hadoop/data/stud_score.csv'
data = pd.read_csv(inputfile)
#其他参数,
###header=None 表示无标题,此时缺省列名为整数;如果设为0,表示第0行为标题
###names,encoding,skiprows等
#读取excel文件,可用 read_excel
In [7]: df=DataFrame(data)

In [8]: df.head(3) ###显示前3行
Out[8]:
stud_code sub_code sub_nmae sub_tech sub_score stat_date
0 2.015101e+09 10101.0 数学分析 NaN 90.0 NaN
1 2.015101e+09 10102.0 高等代数 NaN 88.0 NaN
2 2.015101e+09 10103.0 大学物理 NaN 67.0 NaN

In [9]: df.count()
Out[9]:
stud_code 121
sub_code 121
sub_nmae 121
sub_tech 0
sub_score 121
stat_date 0
dtype: int64

In [10]: df['sub_score'].describe() ##汇总学生各科成绩
Out[10]:
count 121.000000
mean 78.561983
std 12.338215
min 48.000000
25% 69.000000
50% 80.000000
75% 89.000000
max 98.000000
Name: sub_score, dtype: float64

In [11]: df['sub_score'].std() ##求学生成绩的标准差
Out[11]: 12.338214729032906

注:DataFrame数据结构的函数或方法有很多,大家可以通过df.[Tab键]方式查看,具体命令的使用方法,如df.count(),可以在Ipython命令行下输入:?df.count() 查看具体使用,退出帮助界面,按q即可。

1.4.5 应用函数及映射

我们知道数据库中有很多函数可用作用于表中元素,DataFrame也可将函数(内置或自定义)应用到各列或行上,而且非常方便和简洁,具体可用通过DataFrame的apply,使或applymap或map,也可以作用到元素级。以下通过实例说明具体使用。

In [23]: d1=DataFrame(np.arange(12).reshape((3,4)),index=['a','b','c'],columns=['a1','a2','a3','a4'])

In [24]: d1
Out[24]:
a1 a2 a3 a4
a 0 1 2 3
b 4 5 6 7
c 8 9 10 11

In [25]: d1.apply(lambda x:x.max()-x.min(),axis=0) ###列级处理
Out[25]:
a1 8
a2 8
a3 8
a4 8
dtype: int64

In [26]: d1.applymap(lambda x:x*2) ###处理每个元素
Out[26]:
a1 a2 a3 a4
a 0 2 4 6
b 8 10 12 14
c 16 18 20 22
In [27]: d1.ix[1]
Out[27]:
a1 4
a2 5
a3 6
a4 7
Name: b, dtype: int64
In [28]: d1.ix[1].map(lambda x:x*2) ###处理每行数据
Out[28]:
a1 8
a2 10
a3 12
a4 14
Name: b, dtype: int64

1.4.6 时间序列

pandas最基本的时间序列类型就是以时间戳(时间点)(通常以python字符串或datetime对象表示)为索引的Series:

dates = ['2017-06-20','2017-06-21','2017-06-22','2017-06-23','2017-06-24']
ts = pd.Series(np.random.randn(5),index = pd.to_datetime(dates))
ts
####ts结果为
2017-06-20 -1.360504
2017-06-21 -0.966608
2017-06-22 0.754748
2017-06-23 0.832451
2017-06-24 -0.307611
dtype: float64
索引为日期的DataFrame数据的索引、选取以及子集构造
ts.index
###显示结果
DatetimeIndex(['2017-06-20', '2017-06-21', '2017-06-22', '2017-06-23',
'2017-06-24'],
dtype='datetime64[ns]', freq=None)

#传入可以被解析成日期的字符串
ts['2017-06-21']
###显示结果
-0.96660788809762366
#传入年或年月
ts['2017-06']
###显示结果
2017-06-20 -1.360504
2017-06-21 -0.966608
2017-06-22 0.754748
2017-06-23 0.832451
2017-06-24 -0.307611
dtype: float64

#时间范围进行切片
ts['2017-06-20':'2017-06-22']
###显示结果
2017-06-20 -1.360504
2017-06-21 -0.966608
2017-06-22 0.754748
dtype: float64

numpy简介

Python中用列表(list)可以用来当作数组使用,不过由于列表的元素可以是任何对象,因此列表中所保存的是对象的指针。这样为了保存一个简单的[1,2,3],需要有3个指针和三个整数对象。对于数值运算来说这种结构显然比较浪费内存和CPU计算时间。
此外python还提供了一个array模块,array对象和列表不同,它直接保存数值,和C语言的一维数组比较类似。但是由于它不支持多维,也没有各种运算函数,因此也不适合做数值运算。

NumPy的诞生弥补了这些不足,NumPy提供了两种基本的对象:ndarray(N-dimensional array object)和 ufunc(universal function object)。ndarray是存储单一数据类型的多维数组,而ufunc则是能够对数组进行处理的函数。Numpy内部运算是通过C语言实现的,所以性能方面也很不错。

1、生成numpy的ndarray的几种方式

numpy提供了ndarray和matrix两种类型的数据,numpy的二维数组能够很好地实现矩阵的各种功能,而且比matrix要灵活,速度也更快。
numpy有ndarray和matix,性能方面ndarray优于matix,所以一般使用ndarray。
我们可以通过给array函数传递Python的序列对象创建数组,如果传递的是多层嵌套的序列,将创建多维数组。
(1)创建numpy的array几种方法

import numpy as np
a = np.array([1, 2, 3, 4])
b = np.array((5, 6, 7, 8))
##b的结果如下
array([5, 6, 7, 8])
c = np.array([[1, 2, 3, 4],[4, 5, 6, 7], [7, 8, 9, 10]])
##c的结果如下
array([[ 1, 2, 3, 4],
[ 4, 5, 6, 7],
[ 7, 8, 9, 10]])

(2)利用numpy内置函数创建

zeros = np.zeros((2,2)) ###生成一个全为0的数组或矩阵
结果为:
array([[ 0., 0.],
[ 0., 0.]])

e = np.eye(2) ###生成一个单位矩阵
##结果如下:
array([[ 1., 0.],
[ 0., 1.]])

(3) 数组的大小可以通过其shape属性获得

c.ndim ##获取数组或矩阵维数
c.shape ##获取矩阵各维长度
c.shape[0] ##获取矩阵行数,shape[1]获取矩阵的列数
c.size ##获取矩阵的元素个数
c.dtype ##查看数组数据类型

(4)修改其形状
数组c的shape有两个元素,因此它是二维数组,其中第0轴的长度为3,第1轴的长度为4。还可以通过修改数组的shape属性,在保持数组元素个数不变的情况下,改变数组每个轴的长度。

c.reshape(4,3) ##把原来3x4矩阵,转换为4x3矩阵
##结果为:array([[ 1, 2, 3],
[ 4, 4, 5],
[ 6, 7, 7],
[ 8, 9, 10]])
c.reshape(2,-1) ###当某个轴的元素为-1时,将根据数组元素的个数自动计算 ###此轴的长度

(5)arange函数类似于python的range函数,通过指定开始值、终值和步长来创建一维数组,注意数组不包括终值:
np.arange(0,1,0.1)
##结果为:array([ 0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])

2、存取元素

(1)数组元素的存取方法和Python的标准方法相同

a = np.arange(10)
a
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
a[5] # 用整数作为下标可以获取数组中的某个元素
5
a[3:5] # 用范围作为下标获取数组的一个切片,包括a[3]不包括a[5]
##结果为:array([3, 4])
a[:-1] # 下标可以使用负数,表示从数组后往前数
结果为:array([0, 1, 2, 3, 4, 5, 6, 7, 8])
a[2:4] = 100,101 # 下标还可以用来修改元素的值
a
#结果为:array([ 0, 1, 40, 41, 4, 5, 6, 7, 8, 9])
a[1:-1:2] # 范围中的第三个参数表示步长,2表示隔一个元素取一个元素
#结果为:array([ 1, 41, 5, 7])

(2) 和Python的列表序列不同,通过下标范围获取的新的数组是原始数组的一个视图。它与原始数组共享同一块数据空间。

b = a[3:7] # 通过下标范围产生一个新的数组b,b和a共享同一块数据空间
b[2] = -10 # 将b的第2个元素修改为-10
#a结果如下
array([ 0, 1, 40, 41, 4, -10, 6, 7, 8, 9])
#b的结果如下
array([ 41, 4, -10, 6])

3、多维数组

创建一个6x6的多维数组或矩阵

a = np.arange(0, 60, 10).reshape(-1, 1) + np.arange(0, 6)
#运行结果如下:
array([[ 0, 1, 2, 3, 4, 5],
[10, 11, 12, 13, 14, 15],
[20, 21, 22, 23, 24, 25],
[30, 31, 32, 33, 34, 35],
[40, 41, 42, 43, 44, 45],
[50, 51, 52, 53, 54, 55]])

a[3:, [0, 2, 5]]
##下标中的第0轴是一个范围,它选取第3行之后的所有行; ##第1轴是整数序列,它选取第0, 2, 5三列
#运行结果为:
array([[30, 32, 35],
[40, 42, 45],
[50, 52, 55]])

a[2::2,::2]
##第0轴,从第2行开始,步长为2;第1轴,从第0行开始,步长为2
##运行结果为:
array([[20, 22, 24],
[40, 42, 44]])

4、矩阵操作

(1)以下介绍矩阵操作,包括两个矩阵间操作如矩阵乘法等。

A = np.array([[1, 2], [-1, 4]])
B = np.array([[2, 0], [3, 4]])
###对应元素相乘
A*B
##结果如下:
array([[ 2, 0],
[-3, 16]])

####矩阵乘法
np.dot(A, B) # 或者 A.dot(B)
##运行结果如下
array([[ 8, 8],
[10, 16]])

(2)线性代数运算
对矩阵进行线性代数运行,如求转置、特征向量等。求A的转置

##求A的转置
A.transpose() ##或A.T
##运行结果
array([[ 1, -1],
[ 2, 4]])

## 求A的逆矩阵
linalg.inv(A)
##运行结果
array([[ 0.66666667, -0.33333333],
[ 0.16666667, 0.16666667]])

# 求A的特征值和特征向量
eigenvalues, eigenvectors = linalg.eig(A)
##其中eigenvalues为 特征值,eigenvectors为特征向量

(3)调整坐标顺序
transpose的参数为坐标,正常顺序为(0, 1, 2, ... , n - 1),
现在传入的为(1, 0)代表C[x][y]转换为C[y][x],第0个和第1个坐标互换。

C.transpose((1,0)) ###第0个和第1个坐标互换
##结果如下:
array([[ 0.68752896, -0.11705268, 0.49078462, -0.48826679, -1.26943352,
-0.97029925],
[ 1.01686837, -1.55073073, -1.40240593, -0.98632156, 0.80378005,
0.33703986],
[ 0.95644284, -0.19360605, 1.82482162, -0.45383782, 0.26197213,
0.9131711 ]])

(4)在矩阵或数组上运用数学和统计方法

import numpy as np
import numpy.random as np_random

print('求和,求平均')
arr = np.random.randn(5, 4)
print(arr)
print(arr.mean())
print(arr.sum())
print(arr.mean(axis = 1)) # 对每一行的元素求平均
print(arr.sum(0)) # 对每一列元素求和,axis可以省略。

(5)向量或矩阵运算与循环运算性能比较

import time as tm  
import numpy as np
 
dim = 100000#数据长度(包含的元素个数)  
x1 = np.ones(dim)  
x2 = np.ones(dim)  
yFor = np.ones(dim)  

tStart = tm.clock()#开始计时  
#for循环解算x1*x2(对应元素相乘)  
for i in range(dim):  
    yFor[i] = x1[i]*x2[i]  
tEnd=tm.clock()#停止计时  
tFor = tEnd-tStart#计算用时  

tStart = tm.clock()#开始计时  
#向量计算x1*x2(对应元素相乘)  
yVector = x1*x2  
tEnd = tm.clock()#停止计时  
tVector = tEnd-tStart#计算用时  

print ('for循环用时tFor=',tFor)  
print ('向量运算用时tVector=',tVector)

5、 数据合拼与拆分

import numpy as np
import numpy.random as np_random

print('连接两个二维数组')
arr1 = np.array([[1, 2, 3], [4, 5, 6]])
arr2 = np.array([[7, 8, 9], [10, 11, 12]])
print(np.concatenate([arr1, arr2], axis = 0)) # 按行连接
print(np.concatenate([arr1, arr2], axis = 1)) # 按列连接

##或通过以下命令进行连接,效果一样
print('垂直stack与水平stack')
print(np.vstack((arr1, arr2))) # 垂直堆叠
print(np.hstack((arr1, arr2))) # 水平堆叠

6、numpy上的通用函数(ufunc)

ufunc是universal function的缩写,它是一种能对数组的每个元素进行操作的函数。NumPy内置的许多ufunc函数都是在c语言级别实现的,因此它们的计算速度非常快。让我们来看一个例子:

import time
import math
import numpy as np

x = [i * 0.001 for i in np.arange(1000000)]
start = time.clock()
for i, t in enumerate(x):
x[i] = math.sin(t)
print ("math.sin:", time.clock() - start )

x = [i * 0.001 for i in np.arange(1000000)]
x = np.array(x)
start = time.clock()
np.sin(x)
print ("numpy.sin:", time.clock() - start )

运行结果如下:
math.sin: 0.5793389999999974
numpy.sin: 0.06916299999999964
说明,numpy.sin比math.sin快10倍多。这得利于numpy.sin在C语言级别的循环计算。

2.1 内容简介
本章主要介绍如何利用Python抓取京东商城商品评论信息,并对这些评论信息进行分析和可视化。下面是要抓取的商品信息,一款女士文胸。这个商品共有红色,黑色和肤色等颜色, 70B到90D共18个尺寸,以及超过500条的购买评论。
2.2 获取页面源码信息
京东商品评论信息是由JS动态加载的,所以直接抓取商品详情页的URL并不能获得商品评论的信息。因此我们需要先找到存放商品评论信息的文件。这里我们使用Chrome浏览器里的开发者工具进行查找。
具体方法是在商品详情页(请参考图2-1)点击鼠标右键,选择检查(请参考图2-2),在弹出的开发者工具界面(可参考图16-3)中选择Network,设置为禁用缓存(Disable cache)和只查看JS文件。然后刷新页面。页面加载完成后向下滚动鼠标找到商品评价部分,等商品评价信息显示出来后,在下面Network界面的左侧筛选框中输入productPageComments,这时下面的加载记录中只有一条信息,这里包含的就是商品详情页的商品评论信息。点击这条信息,在右侧的Preview界面中可以看到其中包含了当前页面中的评论信息。

图2-1 商品详情页

图2-2 检查详情页面

图2-3 开发者工具页面

复制这条信息,并把URL地址放在浏览器中打开,里面包含了当前页的商品评论信息。这就是我们要抓取的URL地址。https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98vv8&productId=10809260839&score=0&sortType=5&page=0&pageSize=10&isShadowSku=0&fold=1
仔细观察这条URL地址可以发现,其中productId=10809260839是当前商品的商品ID。与商品详情页URL中的ID一致。而page=0是页码。如果我们要获取这个商品的所有评论,只需要更改page后面的数字即可。
在获得了商品评论的真实地址以及URL地址的规律后,我们开始使用python抓取这件商品的500+条评论信息。并对这些信息进行处理和分析。
2.3 抓取信息前的准备工作
设置完请求的头文件和Cookie信息后,我们开始抓取京东商品评论的信息。在URL中包含两个重要的信息,一个是商品ID,另一个是页码。这里我们只抓取一个商品的评论信息,因此商品ID不需要更改。但这个商品的评论有500+条,也就是有近40页需要抓取,因此页码不是一个固定值,需要在0-40之间变化。这里我们将URL分成两部分,通过随机生成页码然后拼接URL的方式进行抓取。
导入必要的库

#为显示中文图标
%matplotlib inline
import matplotlib.font_manager as fm
myfont = fm.FontProperties(fname='/home/hadoop/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/simhei.ttf')
#导入requests库(请求和页面抓取)
import requests
#导入time库(设置抓取Sleep时间)
import time
#导入random库(生成乱序随机数)
import random
#导入正则库(从页面代码中提取信息)
import re
#导入数值计算库(常规计算)
import numpy as np
#导入科学计算库(拼表及各种分析汇总)
import pandas as pd
#导入绘制图表库(数据可视化)
import matplotlib.pyplot as plt
#导入结巴分词库(分词)
import jieba as jb
#导入结巴分词(关键词提取)
import jieba.analyse

2.4将爬虫伪装成浏览器
导入完库文件后,还不能直接进行抓取,因为这样很容易被封。我们还需要对爬虫进行伪装,是爬虫看起来更像是来自浏览器的访问。这里主要的两个工作是设置请求中的头文件信息以及设置Cookie的内容。
头文件信息很容易找到,在Chrome的开发者工具中选择Network,刷新页面后选择Headers就可以看到本次访问的头文件信息,里面包含了一些浏览器的技术参数和引荐来源信息。将这些信息直接添加到代码中就可以,这里我们将头部信息保存在headers中。

#设置请求中头文件的信息
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'Accept':'text/html;q=0.9,*/*;q=0.8',
'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Connection':'close',
'Referer':'https://item.jd.com/10809260821.html'
}

在查看头文件信息的旁边还有一个Cookies标签(如图16-3),点击进去就是本次访问的Cookies信息。这里的Cookies信息与前面头文件中的Cookie信息一致,不过这里更加清晰。把Request Cookies信息复制到代码中即可,这里我们将Request Cookies信息保存在Cookie中。

#设置Cookie的内容
cookie={'TrackID':'1mJoJegxxximdOIuMj1L78NM9IEUYBloQE8lNf5Kr0SN4bLXqWbNQGsuWLT7VSxXgBrnuOwGj9xdFUbqz1sLwrpxkzkjTA-HSsgVP9iJhv-g',
'__jda':'122270672.413567069.1502329865.1505359716.1505377343.17',
'__jdb':'122270672.4.413567069|17.1505377343',
'__jdc':'122270672',
'__jdu':'413567069',
'__jdv':'122270672|p.egou.com|t_36378_879430_c|tuiguang|5191ffe38de346c28750ae3309faf11a|1505288084897',
'areaId':'2',
'cn':'17',
'ipLoc-djd':'2-2830-51811-0.137557061',
'ipLocation':'%u4E0A%u6D77',
'mx':'0_X',
'rkv':'V0800',
'user-key':'acce01e8-1533-4aaa-bcea-34ac9b6d6a53',
'xtest':'4657.553.d9798cdf31c02d86b8b81cc119d94836.b7a782741f667201b54880c925faec4b'}

2.5抓取商品评论信息
设置完请求的头文件和Cookie信息后,我们开始抓取京东商品评论的信息。前面分析URL的时候说过,URL中包含两个重要的信息,一个是商品ID,另一个是页码。这里我们只抓取一个商品的评论信息,因此商品ID不需要更改。但这个商品的评论有500+条,也就是有近40页需要抓取,因此页码不是一个固定值,需要在0-40之间变化。这里我们将URL分成两部分,通过随机生成页码然后拼接URL的方式进行抓取。

#设置URL的第一部分
url1='https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98vv0&productId=10809260821&score=0&sortType=5&page='
#设置URL的第二部分
url2='&pageSize=10&isShadowSku=0&fold=1'
#乱序输出0-40的唯一随机数
ran_num=random.sample(range(40), 40)

为了使抓取过程看起来更加随机,我们没有从第1页一直抓取到第40页。而是使用random生成0-40的唯一随机数,也就是要抓取的页码编号。然后再将页码编号与两部分URL进行拼接。这里我们只知道商品有500+的评论,但并不知道具体数字,所以抓取范围定位从0-40页。
下面是具体的抓取过程,使用for循环每次从0-40的随机数中找一个生成页码编号,与两部分的URL进行拼接。生成要抓取的URL地址并与前面设置好的头文件信息和Cookie信息一起发送请求获取页面信息。将获取到的页面信息进行汇总。每次请求间休息5秒针,避免过于频繁的请求导致返回空值。

#拼接URL并乱序循环抓取页面
for i in ran_num:
a = ran_num[0]
if i == a:
i=str(i)
url=(url1+i+url2)
r=requests.get(url=url,headers=headers,cookies=cookie)
html=r.content
else:
i=str(i)
url=(url1+i+url2)
r=requests.get(url=url,headers=headers,cookies=cookie)
html2=r.content
html = html + html2
time.sleep(5)
print("当前抓取页面:",url,"状态:",r)

在抓取的过程中输出每一步抓取的页面URL以及状态。通过下面的截图可以看到,在page参数后面的页码是随机生成的并不连续。抓取完40个页面后,我们还需要对页面进行编码。完成编码后就可以看到其中所包含的中文评论信息了。后面大部分工作就是要对这些评论信息进行不断提取和反复的清洗。

#对抓取的页面进行编码
html=str(html, encoding = "GBK")

这里建议将抓取完的数据存储在本地,后续工作可以直接从本地打开文件进行清洗和分析工作。避免每次都要重新抓取数据。这里我们将数据保存在桌面的jd_page.txt文件中。

#将编码后的页面输出为txt文本存储
file = open("./jd_page.txt", "w")
file.write(html)
file.close()
#读取存储的txt文本文件
html = open('./jd_page.txt', 'r').read()

2.6提取信息并进行数据清洗
京东的商品评论中包含了很多有用的信息,我们需要将这些信息从页面代码中提取出来,
整理成数据表以便进行后续的分析工作。这里应该就是整个过程中最苦逼的数据提取和清洗工作了。
我们使用正则对每个字段进行提取。对于特殊的字段在通过替换等方式进行提取和清洗。
下面是提取的第一个字段userClient,也就是用户发布评论时所使用的设备类型,这类的字段提取还比较简单,
一行代码搞定。查看一下提取出来的字段还比较干净。使用同样的方法我们分别提取了以下这些字段的内容。

#使用正则提取userClient字段信息
userClient=re.findall(r',"usefulVoteCount".*?,"userClientShow":(.*?),',html)
#使用正则提取userLevel字段信息
userLevel=re.findall(r'"referenceImage".*?,"userLevelName":(.*?),',html)
#使用正则提取productColor字段信息
productColor=re.findall(r'"creationTime".*?,"productColor":(.*?),',html)
#使用正则提取recommend字段信息
recommend=re.findall(r'"creationTime".*?,"recommend":(.*?),',html)
#使用正则提取nickname字段信息
nickname=re.findall(r'"creationTime".*?,"nickname":(.*?),',html)
#使用正则提取userProvince字段信息
userProvince=re.findall(r'"referenceImage".*?,"userProvince":(.*?),',html)
#使用正则提取usefulVoteCount字段信息
usefulVoteCount=re.findall(r'"referenceImage".*?,"usefulVoteCount":(.*?),',html)
#使用正则提取days字段信息
days=re.findall(r'"usefulVoteCount".*?,"days":(.*?)}',html)
#使用正则提取score字段信息
score=re.findall(r'"referenceImage".*?,"score":(.*?),',html)
#使用正则提取isMobile字段信息
isMobile=re.findall(r'"usefulVoteCount".*?,"isMobile":(.*?),',html)

使用for循环将字段中所有的}替换为空。替换完成后字段看起来干净多了。

#替换掉最后的}
mobile=[]
for m in isMobile:
n=m.replace('}','')
mobile.append(n)

productSize字段中包含了胸围和杯罩两类信息,为了获得独立的杯罩信息需要进行二次提取,将杯罩信息单独保存出来。

#使用正则提取productSize字段信息
productSize=re.findall(r'"creationTime".*?,"productSize":(.*?),',html)
#使用for循环将productSize中的第三个字符杯罩信息提取出来,并保持在cup字段中。
#提取杯罩信息
cup=[]
for s in productSize:
s1=s[3]
cup.append(s1)
##提取天数
days1=[]
for d in table['days']:
s1=d[0][0]
s1=int(s1)
days1.append(s1)

创建评论的日期信息仅依靠正则提取出来的信息还是比较乱,无法直接使用。因此也需要进行二次提取。下面是使用正则提取出的结果。

#使用正则提取时间字段信息
creationTime1=re.findall(r'"creationTime":(.*?),"referenceName',html)

日期和时间信息处于前20个字符,在二次提取中根据这个规律直接提起每个条目的前20个字符即可。将日期和时间单独保存为creationTime。

#提取日期和时间
creationTime=[]
for d in creationTime1:
date=d[1:20]
creationTime.append(date)

在上一步日期和时间的基础上,我们再进一步提取出单独的小时信息,方法与前面类似,提取日期时间中的第11和12个字符,就是小时的信息。提取完保存在hour字段以便后续的分析和汇总工作。

#提取小时信息
hour=[]
for h in creationTime:
date=h[10:13]
hour.append(date)

最后要提取的是评论内容信息,页面代码中包含图片的评论信息是重复的,因此在使用正则提取完后还需要对评论信息进行去重。

#使用正则提取评论信息
content=re.findall(r'"guid".*?,"content":(.*?),',html)
#使用if进行判断,排除掉所有包含图片的评论信息,已达到评论去重的目的。

#对提取的评论信息进行去重
content_1=[]
for i in content:
if not "img" in i:
content_1.append(i)

完成所有字段信息的提取和清洗后,将这些字段组合在一起生成京东商品评论数据汇总表。下面是创建数据表的代码。数据表生成后还不能马上使用,需要对字段进行格式设置,例如时间和日期字段和一些包含数值的字段。具体的字段和格式设置依据后续的分析过程和目的。这里我们将creationTime设置为时间格式,并设置为数据表的索引列。将days字段设置为数值格式。

#将前面提取的各字段信息汇总为table数据表,以便后面分析
table=pd.DataFrame({'creationTime':creationTime,'hour':hour,'nickname':nickname,'productColor':productColor,'productSize':productSize,'cup':cup,'recommend':recommend,'mobile':mobile,'userClient':userClient,'userLevel':userLevel,'userProvince':userProvince,'usefulVoteCount':usefulVoteCount,'content_1':content_1,'days':days1,'score':score})
#将creationTime字段更改为时间格式
table['creationTime']=pd.to_datetime(table['creationTime'])
#设置creationTime字段为索引列
table = table.set_index('creationTime')
#设置days字段为数值格式
#table['days']=table['days'].astype(np.int64)

查看通过数据清理后的数据

#查看整理完的数据表
table.head()


保存清洗和预处理完的数据表。我们这里将数据表保存为csv格式。到了这一步可以选择在Excel
中完成后续的数据分析和可视化过程,也可以继续在python中完成。我们这里选择继续在python中完成后续的数据分析和可视化工作。

#保存table数据表
table.to_csv('./jd_table.csv')

2.7 数据分析及可视化
2.7.1分月评论数据变化趋势
首先查看京东商品评论的时间变化趋势情况,大部分用户在购买商品后会在10天以内进行评论,因此我们可以近似的认为在一个月的时间维度中评论时间的变化趋势代表了用户购买商品的变化趋势。按月的维度对数据表进行汇总,并提取每个月的nickname的数量。下面是具体的代码和分月数据。

#对数据表按月进行汇总并生成新的月度汇总数据表
table_month=table.resample('M',how=len)
#提取按月汇总的nickname
month=table_month['nickname']

数据范围从2016年06月到2017年08月。使用柱状图对分月数据进行可视化。从图表中可以看到2016年10月是评论的高峰,也可以近似的认为这个时间段是用户购买该商品的高峰(10月18日是京东促销活动)。排除2016年6月和11月数据,整齐趋势中夏、冬季评论量较高,夏季较底。这是由于该商品的季节属性导致的。

#绘制分月评论数量变化趋势图
plt.rc('font', family='SimHei', size=9)
a=np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
plt.bar([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],month,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('月份',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('分月评论数量变化趋势',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.xticks(a,('16-06','07','08','09','10','11','12','17-01','02','03','04','05','06','07','08'))
plt.show()


通过筛选将数据表分为使用移动设备和未使用移动设备两个表格,再分别查看和对比评论变化趋势。

#在table表中筛选使用移动设备的条目并创建新表
mobile_t=table.loc[table["mobile"] == "true"]
#在table中筛选没有使用移动设备的条目并创建新表
mobile_f=table.loc[table["mobile"] == "false"]
#按月汇总使用移动设备的数据
mobile_t_m=mobile_t.resample('M',how=len)
#按月汇总不使用移动设备的数据
mobile_f_m=mobile_f.resample('M',how=len)
#提取使用移动设备的按月汇总nickname
mobile_y=mobile_t_m['nickname']
#提取没有使用移动设备的按月汇总nickname
mobile_n=mobile_f_m['nickname']

从结果中可以看出使用PC设备进行评论的用户在所有的时间段中都要稍高于使用移动设别的用户。

plt.subplot(2, 1, 1)
plt.plot(mobile_y,'go',mobile_y,'g-',color='#99CC01',linewidth=3,markeredgewidth=3,markeredgecolor='#99CC01',alpha=0.8)
plt.ylabel('移动设备评论数量',fontproperties=myfont,size=12)
plt.title('PC与移动设备评论数量变化趋势',fontproperties=myfont,size=12)
plt.subplot(2, 1, 2)
plt.plot(mobile_n,'go',mobile_n,'g-',color='#99CC01',linewidth=3,markeredgewidth=3,markeredgecolor='#99CC01',alpha=0.8)
plt.xlabel('月份',fontproperties=myfont,size=12)
plt.ylabel('PC评论数量',fontproperties=myfont,size=12)
plt.show()


2.7.2 24小时评论数量变化趋势
按小时维度对评论数据进行汇总,查看用户在24小时中的评论变化趋势。这里需要说明的是24小时趋势只能反映用户登录京东商城的趋势,并不能近似推断用户购买商品的时间趋势。

#按24小时分别对table表中的nickname进行计数
hour_group=table.groupby('hour')['nickname'].agg(len)

从24小时评论趋势图来看,发布商品评论的趋势与作息时间一致,并且每日的闲暇时间是发布评论的高峰。如早上的9点,15点和晚上的21点,是一天24小时中的三个评论高峰点。

#汇总24小时评论数量变化趋势图
plt.rc('font', family='STXihei', size=9)
a=np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21])
plt.bar([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21],hour_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('24小时',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('24小时评论数量变化趋势',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.xticks(a,('0','1','2','5','7','8','9','10','11','12''13','14','15','16','17','18','19','20','21','22','23'))
plt.show()


将24小时的评论数量分为移动设备和未使用移动设备,查看并对比这两者的变化趋势情况。

#在使用移动设备的表中按24小时对nickname进行计数
mobile_y_h=mobile_t.groupby('hour')['nickname'].agg(len)
#在没有使用移动设备的表中按24小时对nickname进行计算
mobile_n_h=mobile_f.groupby('hour')['nickname'].agg(len)
移动设备的评论数量在24小时中的各个时间段与PC的评论数量相当,并且在9点及晚间8点左右比较活跃。
#汇总PC与移动设备24小时评论数量变化趋势
plt.subplot(2, 1, 1)
plt.plot(mobile_y_h,'go',mobile_y_h,'g-',color='#99CC01',linewidth=3,markeredgewidth=3,markeredgecolor='#99CC01',alpha=0.8)
plt.ylabel('移动设备评论数量',fontproperties=myfont,size=12)
plt.title('PC与移动设备24小时评论数量变化趋势',fontproperties=myfont,size=12)
plt.subplot(2, 1, 2)
plt.plot(mobile_n_h,'go',mobile_n_h,'g-',color='#99CC01',linewidth=3,markeredgewidth=3,markeredgecolor='#99CC01',alpha=0.8)
plt.xlabel('24小时',fontproperties=myfont,size=12)
plt.ylabel('PC评论数量',fontproperties=myfont,size=12)
plt.show()


2.7.3 用户客户端分布情况
前面的分析中,我们看到使用移动设备进行评论的用户要远高于PC端的用户,下面我们对用户所使用的设备分布情况进行统计。首先在数据表中按用户设备(userClient)对nickname字段进行计数汇总。

#在table表中按userClient对数据进行汇总
userClient_group=table.groupby('userClient')['nickname'].agg(len)

从用户客户端分布情况来看,移动端的设备占大多数,其中使用iphone的用户要底于Android用户。由于微信购物和QQ购物单独被分了出来,无法确定设备,因此单独进行对比。使用微信购物渠道的用户要底于QQ购物。

#汇总用户客户端分布情况
plt.rc('font', family='STXihei', size=9)
a=np.array([1,2,3,4,5])
plt.bar([1,2,3,4,5],userClient_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('客户端分布',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('用户客户端分布情况',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.ylim(0,300)
plt.xticks(a,('other','Android','iPhone','微信购物','QQ购物'),fontproperties=myfont,size=12)
plt.show()


2.7.4 购买后评论天数分布
在购买后评论天数方面,我们将用户发布评论与购买的时间间隔分为7组,分别为购买后1-5天内,5-10天内,10-15天内,
15-20天内,20-25天内,25-30天内,以及大于30天。然后统计并对比用户在不同时间区间内发布评论的数量情况。

#设置分组条件,并对table表中的days字段进行分组
bins = [0, 5, 10, 15, 20, 25, 30, 92]
day_group = ['5天', '10天', '15天', '20天', '25天','30天','大于30天']
table['day_group'] = pd.cut(table['days'], bins, labels=day_group)
#按新设置的分组对数据进行汇总
days_group=table.groupby('day_group')['nickname'].agg(len)

从图表中看出,购买后5天到10天以内是用户发布评论的高峰,也就我们之前推测评论时间趋势近似于购买时间的依据。随着时间的增加评论数量逐渐下降。

#绘制用户购买后评论天数分布图
plt.rc('font', family='STXihei', size=9)
a=np.array([1,2,3,4,5,6,7])
plt.bar([1,2,3,4,5,6,7],days_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('购买后天数',fontproperties=myfont,size=12)
plt.ylabel('发布评论数量',fontproperties=myfont,size=12)
plt.title('购买后评论天数分布',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.ylim(0,300)
plt.xticks(a,('5天','10天','15天','20天','25天','30天','大于30天'),fontproperties=myfont,size=12)
plt.show()


2.7.5商品评分分布情况
京东商城对商品按5星评分划分为好评,中评和差评三个等级。我们这里来看下用户5星评分的分布情况。
在数据表中score字段中的值表示了用户对胸罩产品的打分情况。我们按打分情况对数据进行汇总。
商品评分分布情况
京东商城对商品按5星评分划分为好评,中评和差评三个等级。我们这里来看下用户5星评分的分布情况。在数据表中score字段中的值表示了用户对胸罩产品的打分情况。我们按打分情况对数据进行汇总。

#在table表中按score对数据进行汇总
score_group=table.groupby('score')['nickname'].agg(len)
#绘制用户评分分布情况图
plt.rc('font', family='STXihei', size=9)
a=np.array([1,3,4,5])
plt.bar([1,3,4,5],score_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('评分分布',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('用户评分分布情况',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='best',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.ylim(0,500)
plt.xticks(a,('1星','3星','4星','5星'),fontproperties=myfont,size=12)
plt.show()


从图表中可以看出,大部分用户对商品的评分是5星。4星以下的几乎没有。但从另一个维度来看,
在用户对最有用评论的投票(usefulVoteCount)中得票最多的是一个1星的评论。

2.7.6 用户胸罩尺码分布情况
在胸罩的尺寸方面包含两个信息,一个是胸围尺寸,另一个是罩杯。我们在前面的清洗过程中对杯罩创建了单独的字段。下面只对这个字段进行汇总统计。

#在table 表中按cup对数据进行汇总
cup_group=table.groupby('cup')['nickname'].agg(len)

从图表中可以看出,评论用户中最多的是B杯罩,其次为A杯罩,C的用户数量较少。

#绘制用户胸罩尺码分布图
plt.rc('font', family='STXihei', size=9)
a=np.array([1,2,3])
plt.bar([1,2,3],cup_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('尺码',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('用户胸罩尺码分布情况',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.ylim(0,350)
plt.xticks(a,('A','B','C'))
plt.show()


2.7.7胸罩颜色偏好分布
这款胸罩共分为三个颜色,红色,肤色和黑色。我们按颜色对评论数据进行汇总,查看用户对不同胸罩颜色的偏好情况。

#在table表中按productColor对数据进行汇总
color_group=table.groupby('productColor')['nickname'].agg(len)
从不同颜色的评论数量上来看,大部分用户购买的是红色。
#绘制用户颜色选择分布图
plt.rc('font', family='STXihei', size=9)
a=np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14])
plt.bar([1,2,3,4,5,6,7,8,9,10,11,12,13,14],color_group,color='#99CC01',alpha=0.8,align='center',edgecolor='white')
plt.xlabel('颜色分布',fontproperties=myfont,size=12)
plt.ylabel('评论数量',fontproperties=myfont,size=12)
plt.title('用户颜色选择分布',fontproperties=myfont,size=12)
plt.legend(['评论数量'], loc='upper right',prop=myfont)
plt.grid(color='#95a5a6',linestyle='--', linewidth=1,axis='y',alpha=0.4)
plt.ylim(0,600)
plt.xticks(a,('宝蓝色','宝蓝色单件','杏色单件','浅紫色','红色','肤色','西瓜红','酒红色','酒红色单件','银灰色','香槟色','黄色','黑色','黑色' ),rotation=30,fontproperties=myfont,size=12)
plt.show()


2.7.8 胸罩评论内容语义分析
前面我们分别对数据表中的字段进行了统计和分析,文章最后我们对商品的评论内容进行语义分析,看看大家在这700+条评论中都在说些什么。
好好先生购买比例
在人工查看了一些评论内容后,我们发现一些有意思的信息。有一部分评论是老公或男朋友发的,这说明一些好好先生会帮老婆或女友购买胸罩。那么这部分用户的比例有多少呢?
我们把评论中包含有关键词“老婆”和“女朋友”的评论单独保存在出来。

#筛选包含”老婆”和”女朋友”的评论
content_2=[]
for i in content_1:
if "老婆"in i or "女朋友"in i:
content_2.append(i)

查看这些包含关键词的评论内容,确实是老公和男朋友来购买胸罩并且发布的评论。

#查看评论内容
content_2
['"老婆说还不错……"',
'"物流很快。内衣老婆很喜欢,款式很好看,穿起来很挺。"',
'"老婆穿起来很好看,穿上之后就特别性感了,手感特别好,好享受好舒服,太棒了。"',
'"已经是老顾客了 第二次购买 效果确实很棒 老婆很喜欢 价格实惠 五分好评"']

经过计算,在这款胸罩产品的评论中,由老公或男朋友购买的比例仅为2.0%

#计算老公或男朋友购买胸罩的比例
len(content_2)/len(content_1)*100

2.7.9 商品评论关键词分析
回归到商品评论分析,我们使用结巴分词对所有胸罩的评论信息进行了分词,并提取了权重最高的关键词列表。

#文本数据格式转换
word_str = ''.join(content_1)
#提取文字关键词
word_rank=jieba.analyse.extract_tags(word_str, topK=20, withWeight=True, allowPOS=())
#转化为数据表
word_rank = pd.DataFrame(word_rank,columns=['word','rank'])
#查看关键词及权重
word_rank.sort('rank',ascending=False)


从高权重关键词列表来看,用户评论以正面信息为主,”不错”,”舒服”,”喜欢”等主观感受的正面评论权重较高。
2.8 结语
本章我们从商品评论信息的抓取,清洗到分析和数据可视化实现了一个完整的闭环。整个过程中数据的清洗和预处理是最为复杂也是耗时最多的工作。由于抓取的数据量较少,只有500+条数据。因此里面的一些结论可能没有代表性,结论也未必准确,仅供参考。

第1章 Scrapy爬虫基础

1.1 Scrapy运行所依赖的组件
Scrapy框架是一个快速高效地抓取并解析网页内容的爬虫工具。它支持批量方式、多线程获取网页内容,采用简单易用地提取元素规则,并且还支持把抓取结果输出到多种结果集中。
目前最新版本为1.2,本课件中演示使用的版本是0.14。Scrapy可以运行在windows和Linux平台上,且都需要Python支持,可以使用Python 2.7以上版本来运行本课件中的例子。

scrapy模块组成图及运行过程

Scrapy在Linux平台上运行需要依赖很多包,以下列出在Ubuntu上安装Scrapy所需要事先安装的包,这些包之间的安装顺序存在依赖关系。
依次安装以下模块:
1. setuptools-0.6c11
2. zope.interface-4.0.1
3. 重新安装Python2.7开发版
4. Twisted-12.1.0
5. w3lib-1.2
6. libxml2和 libxml2-dev
7. libxslt1-dev和libxslt-dev
8. python2.7-mysqldb
9. Scrapy 0.14.4

1.2 手动创建一个Scrapy爬虫项目,并抓取P2P网贷信息
安装Scrapy后,可以通过命令行来测试是否安装成功,如下图:

手动创建并运行一个爬虫项目,通常需要以下几个步骤:
① 创建一个空的scrapy项目
② 定义要抓取的网址,并解析网页中特定标签中的内容
③ 定义item类来规范抓取后的内容
④ 编写pipeline类实现item信息落地
1.3创建一个空的scrapy项目
Scrapy中的爬虫是属于某个项目的。首先可以通过在命令行输入“scrapystartproject”命令来创建一个空的项目。比如现在要创建一个名为mydemo的项目,操作如下:

该命令会在硬盘上创建文件夹mydemo,并自动生成子文件夹mydemo和一些文件,其中mydemo/spiders/目录是用来存放具体爬虫文件的目录。爬虫文件需要用户手动创建,一个目录下可以同时存在多个爬虫文件

1.4 定义要抓取的网址,并解析网页中特定标签中的内容
在mydemo/spiders/下创建Python文件mydemo_page.py,内容如下:

# encoding: utf-8
#!/usr/bin/python
fromscrapy.spider import BaseSpider
classMyDemoSpider(BaseSpider):
name = "p2p_page"
start_urls = ["http://www.pingledai.com/td/tz/jkinfo?jkid=364&sh=1"]
def parse(self, response):
print(response.body)

• name属性:用来给爬虫起名字。它也是运行“scrapy crawl”命令时所要提供的参数值。
• start_urls属性:指明了爬虫要抓取的URL地址,类型是字符串数组,可以给爬虫指定一个抓取列表。
• parse方法:是对抓取得到的内容进行解析的方法。参数中的response是抓取后获得的HTML页面内容

创建mydemo_spider.py文件后,我们就可以启动爬虫了。只需要输入命令行
scrapy crawl p2p_page <回车>,就可以看到日志信息,爬虫已经启动,并且最后打印出目标页面对应的HTML源码来。

HTML源码中有我们需要的网贷信息,如何提取出这些信息,就需要在parse方法中继续编码了。Scrapy使用一种XPath Selector机制来解析HTML中的数据,它是基于XPath表达式语法的(有关XPath的详细内容,可参见http://www.w3.org/TR/xpath/)。
XPath Selector在Scrapy中内置了两种创建方式,分别是HtmlXPathSelector和XmlPathSelector用来解析Html和Xml代码。在parse方法中可以对HTML利用XPath配合css代码来提取所需的内容。XPath选择器有三个方法:
1. select(xpath): 返回一个相对于当前选中节点的选择器列表(一个XPath可能选到多个节点)
2. extract(): 返回选择器(列表)对应的节点的字符串(列表)
3. re(regex): 返回正则表达式匹配的字符串(分组匹配)列表

parse方法里面使用Xpath方式来逐个解析需要提取出来的要素信息

def parse(self, response):
hxs = HtmlXPathSelector(response)
#标题
title=hxs.select('//span[@id="tzjkcap"]/text()').extract()[0].encode('utf-8')
print(title)
data = hxs.select('//div[@class="pull-left"]/text()').extract()
#借款金额
amount = data[0].encode('utf-8')
print(amount)
#利率
interest = data[1].encode('utf-8')
print(interest)
#借款进度
progress = data[5].encode('utf-8')
progress = progress.replace(' ','')
progress = progress.replace('\n','')
print(progress)
#还款方式
payway = data[6].encode('utf-8')
print(payway)
#期限
term = data[8].encode('utf-8')
print(term)

解析出P2P网贷信息后,同样使用启动爬虫的命令
scrapy crawl p2p_page <回车>,就可以在控制台看到打印出来的内容了。

1.5 定义item类来规范抓取后的内容
页面上要抓取的内容,通常是一条完整的记录,譬如一个帖子、一条职位信息或是一条QQ动态。在Scrapy中可使用item类来封装这条记录,item.py文件在spiders的同级目录下。

fromscrapy.item import Item, Field
classMydemoItem(Item):
# define the fields for your item here like:
# name = Field()
pass

Item和Field类都是Scrapy框架提供的。Field类使用Python内置的字典(Python dict)对象,用来存放不同类型的属性值。MydemoItem类继承自Item类,pass空语句代表本程序不执行相应的动作。

在MydemoItem类中定义p2p招标信息的每个属性,类型均为Field类型,并增加一个打印所有属性的方法

# encoding: utf-8
fromscrapy.item import Item, Field
classMydemoItem(Item):
# define the fields for your item here like:
# name = Field()
#标题
title=Field()
#期限
term=Field()
#利率
interest=Field()
#还款方式
payway=Field()
#借款金额
amount=Field()
#借款进度
progress=Field()
#打印出所有属性值
defprintProps(self):
#return self['progress']
return "[标题] %s, [期限] %s, [利率] %s, [还款方式] %s, [借款进度] %s, [借款金额] %s " %(self['title'], self['term'], self['interest'], self['payway'], self['progress'], self['amount'])

在spiders/mydemo_page.py中的parse方法里面创建MydemoItem实例,把HTML中抓取的内容赋值给实例中的每个属性。

def parse(self, response):
hxs = HtmlXPathSelector(response)
#标题
title=hxs.select('//span[@id="tzjkcap"]/text()').extract()[0].encode('utf-8')
#print(title)
... ...
... ...
p2pitem = items.MydemoItem()
p2pitem['title'] = title
p2pitem['term'] = term
p2pitem['interest'] = interest
p2pitem['payway'] = payway
p2pitem['amount'] = amount
p2pitem['progress'] = progress
print(p2pitem.printProps())

最后运行scrapy crawl p2p_page命令,查看运行结果

1.6 如何把抓取后信息放入文件中
1. 定义一个管道类来实现抓取内容落地
parse方法返回的MyDemoItem对象,可以在pipelines.py文件的process_item方法中得到继续处理,把MyDemoItem中的属性写入文件。示例代码如下,把抓取下来的p2p网贷信息项写入p2poutput.dat文件中。FilePipelines.py文件可以从pipelines.py复制得到。

# encoding: utf-8
importos
classMydemoPipeline(object):
defprocess_item(self, item, spider):
#把解析后的内容放入文件中
fname = "p2poutput.dat"
f_handle = open(fname,'w')
f_handle.write(item.printProps())
f_handle.close()
return item

2. 配置管道类的执行顺序
除了定义FilePipelines文件之外,还需要让scrapy知道有这个文件存在,所以需要在mydemo/settings.py文件中增加配置项,如下红色的代码行,在里面增加“目录名.文件名.类名”这样的配置内容。

BOT_NAME = 'mydemo'
BOT_VERSION = '1.0'
SPIDER_MODULES = ['mydemo.spiders']
NEWSPIDER_MODULE = 'mydemo.spiders'
USER_AGENT = '%s/%s' % (BOT_NAME, BOT_VERSION)
ITEM_PIPELINES = {'mydemo.FilePipelines.MydemoPipeline':1}

3. 执行p2p_page爬虫
再次运行scrapy crawl p2p_page命令,查看运行结果。会发现在mydemo目录下多出来一个p2poutput.dat文件,就是管道文件生成的爬取结果文件。

以上完成了一个最基本的爬虫抓取数据的功能,后续课程还会陆续介绍如何把抓取的内容写入到数据库?如何让两个爬虫配合工作及如何抓取图片文件。

第2章 分析京东客户评价数据

第1章、机器学习基础
第2章、机器学习流程
第3章、机器学习实例
第4章、SKlearn简介
第5章、预处理方法
第6章、模型评估与参数优化
第7章、集成学习
第8章、客户价值分析实例
第9章、情感分析实例
第10章、聚类分析实例
第11章、神经网络基础
第12章、神经网络人脸识别实例


本章数据集下载

第4章 特征提取、转换和选择

在实际机器学习项目中,我们获取的数据往往是不规范、不一致、有很多缺失数据,甚至不少错误数据,这些数据有时又称为脏数据或噪音,在模型训练前,务必对这些脏数据进行处理,否则,再好的模型,也只能脏数据进,脏数据出。
这章我们主要介绍对数据处理涉及的一些操作,主要包括:
特征提取
特征转换
特征选择

4.1 特征提取

特征提取一般指从原始数据中抽取特征。

4.1.1 词频-逆向文件频率(TF-IDF)

词频-逆向文件频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。
在下面的代码段中,我们以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能。

4.1.2 Word2Vec

Word2vec是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。 在下面的代码段中,我们首先用一组文档,其中每一个文档代表一个词语序列。对于每一个文档,我们将其转换为一个特征向量。此特征向量可以被传递到一个学习算法。

4.1.3 计数向量器

计数向量器(Countvectorizer)和计数向量器模型(Countvectorizermodel)旨在通过计数来将一个文档转换为向量。
以下用实例来说明计数向量器的使用。
假设有以下列id和texts构成的DataFrame:

每行text都是Array [String]类型的文档。调用fit,CountVectorizer产生CountVectorizerModel含词汇(a,b,c)。转换后的输出列“向量”包含:
调用的CountVectorizer产生词汇(a,b,c)的CountVectorizerModel,转换后的输出向量如下:

每个向量代表文档的词汇表中每个词语出现的次数。

4.2 特征转换

在机器学习中,数据处理是一件比较繁琐的事情,需要对原有特征做多种处理,如类型转换、标准化特征、新增衍生特征等等,需要耗费大量的时间和精力编写处理程序,不过,自从Spark推出ML后,情况大有改观,Spark ML包中提供了很多现成转换器,例如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。

4.2.1分词器

分词器(Tokenization)将文本划分为独立个体(通常为单词)。

4.2.2 移除停用词

停用词为在文档中频繁出现,但未承载太多意义的词语,它们不应该被包含在算法输入中,所以会用到移除停用词(StopWordsRemover)。
示例:
假设我们有如下DataFrame,有id和raw两列

通过对raw列调用StopWordsRemover,我们可以得到筛选出的结果列如下

其中,“I”, “the”, “had”以及“a”被移除。
实现以上功能的详细代码:

4.2.3 n-gram

一个n-gram是一个长度为整数n的字序列。NGram可以用来将输入转换为n-gram。

4.2.4 二值化

二值化,通过设置阀值,将连续型的特征转化为两个值。大于阀值为1,否则为0。
注:以下规范化操作一般是针对一个特征向量(dataFrame中的一个colum)来操作的。

4.2.5 主成分分析

主成分分析被广泛应用在各种统计学、机器学习问题中,是最常见的降维方法之一。
PCA在Spark2.0用法比较简单,只需要设置:
.setInputCol(“features”)//保证输入是特征值向量
.setOutputCol(“pcaFeatures”)//输出
.setK(3)//主成分个数
注意:PCA前一定要对特征向量进行规范化(标准化)!!!

4.2.6 多项式展开

多项式展开(PolynomialExpansion)即通过产生n维组合将原始特征将特征扩展到多项式空间。下面的示例会介绍如何将你的特征集拓展到3维多项式空间。

4.2.7 离散余弦变换

离散余弦变换(DCT)是与傅里叶变换相关的一种变换,它类似于离散傅立叶变换,但是只使用实数。

4.2.8 字符串-索引变换

字符串—索引变换(StringIndexer)是将字符串列编码为标签索引列。示例数据为一个含有id和category两列的DataFrame
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c

category是有3种取值的字符串列(a、b、c),使用StringIndexer进行转换后我们可以得到如下输出,其中category作为输入列,categoryIndex作为输出列:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
a获得索引0,因为它是最频繁的,随后是具有索引1的c和具有索引2的b。
如果测试数据集中比训练数据集多了一个d类:
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
如果您没有设置StringIndexer如何处理未看见的标签(默认值)或将其设置为“错误”,则会抛出异常。 但是,如果您调用了setHandleInvalid(“skip”),d类将不出现,结果为以下数据集:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
以下是使用StringIndexer的一个示例:

4.2.9 索引-字符串变换

与StringIndexer对应,索引—字符串变换(IndexToString)是将指标标签映射回原始字符串标签。
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
应用IndexToString,将categoryIndex作为输入列,将originalCategory作为输出列,我们可以检索我们的原始标签(它们将从列的元数据中推断):
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
以下是以上整个过程的一个实例:

4.2.10 独热编码

独热编码(OneHotEncoder)将标签指标映射为二值向量,其中最多一个单值。

【说明】
1、OneHotEncoder缺省状态下将删除最后一个分类或把最后一个分类作为0.
//示例

显示结果如下:
+----+---+-----+
| x| c|c_idx|
+----+---+-----+
| 1.0| a| 0.0|
| 1.5| a| 0.0|
|10.0| b| 2.0|
| 3.2| c| 1.0|
| 3.8| c| 1.0|
+----+---+-----+
最后一个分类为b,通过OneHotEncoder变为向量后,已被删除。

显示结果如下:
+----+---+-----+-------------+
| x| c|c_idx| c_idx_vec|
+----+---+-----+-------------+
| 1.0| a| 0.0|(2,[0],[1.0])|
| 1.5| a| 0.0|(2,[0],[1.0])|
|10.0| b| 2.0| (2,[],[])|
| 3.2| c| 1.0|(2,[1],[1.0])|
| 3.8| c| 1.0|(2,[1],[1.0])|
+----+---+-----+-------------+
与其他特征组合为特征向量后,将置为0,请看下例

显示结果如下:
+----+---+-----+-------------+------------------+
|x |c |c_idx|c_idx_vec |features |
+----+---+-----+-------------+------------------+
|1.0 |a |0.0 |(2,[0],[1.0])|[1.0,0.0,1.0,0.0] |
|1.5 |a |0.0 |(2,[0],[1.0])|[1.5,0.0,1.0,0.0] |
|10.0|b |2.0 |(2,[],[]) |[10.0,2.0,0.0,0.0]|
|3.2 |c |1.0 |(2,[1],[1.0])|[3.2,1.0,0.0,1.0] |
|3.8 |c |1.0 |(2,[1],[1.0])|[3.8,1.0,0.0,1.0] |
+----+---+-----+-------------+------------------+
如果想不删除最后一个分类,可添加setDropLast(False)。

显示结果如下:
+----+---+-----+-------------+
| x| c|c_idx| c_idx_vec|
+----+---+-----+-------------+
| 1.0| a| 0.0|(3,[0],[1.0])|
| 1.5| a| 0.0|(3,[0],[1.0])|
|10.0| b| 2.0|(3,[2],[1.0])|
| 3.2| c| 1.0|(3,[1],[1.0])|
| 3.8| c| 1.0|(3,[1],[1.0])|
+----+---+-----+-------------+
与其他特征向量结合后,情况如下:

显示结果如下:
+----+---+-----+-------------+----------------------+
|x |c |c_idx|c_idx_vec |features |
+----+---+-----+-------------+----------------------+
|1.0 |a |0.0 |(3,[0],[1.0])|(5,[0,2],[1.0,1.0]) |
|1.5 |a |0.0 |(3,[0],[1.0])|(5,[0,2],[1.5,1.0]) |
|10.0|b |2.0 |(3,[2],[1.0])|[10.0,2.0,0.0,0.0,1.0]|
|3.2 |c |1.0 |(3,[1],[1.0])|[3.2,1.0,0.0,1.0,0.0] |
|3.8 |c |1.0 |(3,[1],[1.0])|[3.8,1.0,0.0,1.0,0.0] |
+----+---+-----+-------------+----------------------+
2、如果分类中出现空字符,需要进行处理,如设置为"None",否则会报错。

4.2.11 向量-索引变换

在下面的例子中,我们读取一个标记点的数据集,然后使用VectorIndexer来决定哪些特征应该被视为分类。我们将分类特征值转换为它们的索引。这个变换的数据然后可以被传递到诸如DecisionTreeRegressor的处理分类特征的算法。

4.2.12交互式

例子,假设我们有以下DataFrame的列“id1”,“vec1”和“vec2”
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
应用与这些输入列的交互,然后interactionedCol作为输出列包含:
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
以下是实现以上转换的具体代码:

4.2.13正则化

以下示例演示如何加载libsvm格式的数据集,然后将每行标准化为具有单位L1范数和单位L∞范数。

4.2.14规范化(StandardScaler)

以下示例演示如何以libsvm格式加载数据集,然后规范化每个要素的单位标准偏差。

4.2.15最大值-最小值缩放

下面的示例展示如果读入一个libsvm形式的数据以及调整其特征值到[0,1]之间。
调用示例:

显示结果如下:
Features scaled to range: [0.0, 1.0]
+--------------+--------------+
| features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+

4.2.16最大值-绝对值缩放

以下示例演示如何加载libsvm格式的数据集,然后将每个特征重新缩放到[-1,1]。

运行结果如下:
+--------------+----------------+
| features| scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]| [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]| [1.0,1.0,1.0]|
+--------------+----------------+

4.2.17离散化重组

以下示例演示如何将双列列存储到另一个索引列的列中。

运行结果如下:
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
| -999.9| 0.0|
| -0.5| 1.0|
| -0.3| 1.0|
| 0.0| 2.0|
| 0.2| 2.0|
| 999.9| 3.0|
+--------+----------------+

4.2.18元素乘积

下面的示例演示了如何使用变换向量值来变换向量

运行结果如下:
+---+-------------+-----------------+
| id| vector|transformedVector|
+---+-------------+-----------------+
| a|[1.0,2.0,3.0]| [0.0,2.0,6.0]|
| b|[4.0,5.0,6.0]| [0.0,5.0,12.0]|
+---+-------------+-----------------+

4.2.19 SQL转换器

假设我们有以下DataFrame和列id,v1和v2
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
这是SQLTransformer "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__":语句的输出。
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
以下是实现以上结果的具体代码:

4.2.20向量汇编

例子
假设我们有一个带有id,hour,mobile,userFeatures和clicked列的DataFrame:
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures是一个包含三个用户特征的向量列。我们希望将hour,mobile和userFeatures合并成一个称为特征的单一特征向量,并使用它来预测是否被点击。如果我们将VectorAssembler的输入列设置为hour,mobile和userFeatures,并将列输出到特征,则在转换后,我们应该得到以下DataFrame:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
以下是实现上述功能的代码:

4.2.21分位数离散化

示例:
假设我们有如下DataFrame包含id,hour:
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour是Double类型的连续特征。我们希望将连续特征变成一个分级特征。给定numBuckets = 3,我们可得到以下DataFrame:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
实现以上功能的scala代码如下:

4.3 特征选择

特征选择(Feature Selection)是从特征向量中选择那些更有效的特征,组成新的、更简单有效的特征向量的过程。它在数据分析中常用使用,尤其在高维数据分析中,可以剔除冗余或影响不大的特征,提升模型的性能。

4.3.1 向量机

假设我们有一个DataFrame与列userFeatures:
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures是一个包含三个用户特征的向量列。假设userFeature的第一列全部为零,因此我们要删除它并仅选择最后两列。 VectorSlicer使用setIndices(1,2)选择最后两个元素,然后生成一个名为features的新向量列:
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
假设userFeatures有输入属性,如[“f1”,“f2”,“f3”],那么我们可以使用setNames(“f2”,“f3”)来选择它们。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
以下是实现向量选择的一个scala代码示例

运行结果:
+--------------------+-------------+
|userFeatures |features |
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(2,[0],[2.3])|
|[-2.0,2.3,0.0] |[2.3,0.0] |
+--------------------+-------------+

4.3.2 R公式

示例:
假设我们有一个DataFrame含有id,country, hour和clicked四列:
id | country |hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
如果我们使用RFormula公式clicked ~ country+ hour,则表明我们希望基于country和hour预测clicked,通过转换我们可以得到如下DataFrame:
id | country |hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
以下是实现上述功能的scala代码:

4.3.3 卡方特征选择

示例:
假设我们有一个DataFrame含有id,features和clicked三列,其中clicked为需要预测的目标:
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
如果我们使用ChiSqSelector并设置numTopFeatures为1,根据标签clicked,features中最后一列将会是最有用特征:
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
使用ChiSqSelector的scala代码示例:

结果显示:
+---+------------------+-------+----------------+
| id| features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
| 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]|
| 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]|
| 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]|
+---+------------------+-------+----------------+

4.4 小结

本章主要介绍了对数据特征或变量的一些常用操作,包括特征提取,特征转换以及特征选择等方法,这些任务在实际项目中往往花费大量时间和精力,尤其要自己编写这方面的代码或函数,更是如此,Spark ML目前提供了很多现成函数,有效使用这些函数将有助于提供我们开发效率,同时使我们有更多时间优化或提升模型性能。下一章我们将介绍优化或提升模型性能一些方法。

第3章 ML Pipelines原理与实战

Spark MLlib 是Spark的重要组成部分,也是最早推出库之一, 其基于RDD的API,算法比较丰富,也比较稳定,也比较好用。但是如果目标数据集结构复杂需要多次处理,或者是对新数据需要结合多个已经训练好的单个模型进行综合计算时,使用 MLlib 将会让程序结构复杂,甚至难于理解和实现。为改变这一局限性,从Spark 1.2 版本之后引入的 ML Pipeline,经过多个版本的发展,目前Spark ML功能齐全、性能稳定,Spark ML克服了MLlib在处理复杂机器学习问题的一些不足(如工作比较复杂,流程不清晰等),向用户提供基于DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用,使整个机器学习过程变得更加易用、简洁、规范和高效。Spark的Pipeline与Scikit中Pipeline功能相近、理念相同。本章主要介绍Spark ML中Pipelines的有关内容。
本章主要介绍ML Pipeline相关内容,包括:
 Pipeline简介
 DataFrame
 构成Pipeline的一些组件
 介绍pipeline的一般原理
 使用pipeline的几个实例

3.1Pipeline简介

3.2DataFrame

以下通过一个实例来说明DataFrame的创建、操作等内容:

3.3 Pipeline组件

Pipeline组件主要包括Transformer和Estimator。
1. Transformer
2.Estimator

3.4 Pipeline原理

要构建一个Pipeline,首先需要定义Pipeline中的各个Stage,如指标提取和转换模型训练等。有了这些处理特定问题的Transformer和Estimator,我们就可以按照具体的处理逻辑来有序的组织Stages并创建一个Pipeline。

图3-1 pipeline在训练数据上的流程

整个流水线是一个估计器。所以当流水线的fit()方法运行后,会产生一个流水线模型,流水线模型是转换器。流水线模型会在测试时被调用,下面的图示说明用法。

图3-2 pipeline在测试数据上的流程

上面的图示中,流水线模型和原始流水线有同样数目的阶段,然而原始流水线中的估计器此时变为了转换器。当流水线模型的transform()方法被调用于测试数据集时,数据依次经过流水线的各个阶段。每个阶段的transform()方法更新数据集,并将之传到下个阶段。
流水线和流水线模型有助于确认训练数据和测试数据经过同样的特征处理流程。
以上两图如果合并为一图,可用如下图形表达:

 

图3-3 Spark pipeline 流程图

其中Pipeline及LogisticRegression都Estimator,Tokenizer,HashingTF,LogisticRegression Model为Transformer。

3.5Pipeline实例

3.5.1使用Estimator, Transformer, and Param实例

机器学习整个过程中,特征转换、特征选择、派生特征等工作,一般需要占据大部分时间,现在ML提供了很多Transformer,如OneHotEncoder、StringIndexer、PCA、Bucketizer、Word2vec等,利用这些函数可极大提高工作效率。
以下通过实例说明如何使用ML库中Estimaor、Transformer和Param等。

3.5.2ML使用Pipeline实例

使用Pipeline的实例。

3.6小结

本章主要介绍了流水线(pipeline)的基本概念,流水线的两个组件:Transformer和Estimator,它们是构成Pipeline的Stage,把这些stages按照一定次序组装到Pipeline上,就构成一个流水线,这些stages包括特征转换、特征选择、模型训练等任务,通过几个实例具体说明Pipeline的创建及使用。流水线是一项重要内容,章节还有很多实际使用实例,下一章主要介绍构成Pipeline的一些stages。熟练使用这些Stages有助于提升我们开发效率。


第2章构建Spark机器学习系统
构建机器学习系统,根据业务需求和使用工具的不同,可能会有些区别。不过主要流程应该差别不大,基本包括数据抽取、数据探索、数据处理、建立模型、训练模型、评估模型、优化模型、部署模型等阶段,在构建系统前,我们需要考虑系统的扩展性、与其他系统的整合、系统升级及处理方式等。这章我们主要介绍针对基于Spark机器学习的架构设计或系统构建的一般步骤、需要注意的一些问题。
本章主要介绍构建Spark机器学习系统的一般步骤:
介绍系统架构
启动集群
加载数据
探索数据
数据预处理
构建模型
模型评估
模型优化
模型保存

2.1机器学习系统架构

Spark发展非常快,到我们着手编写本书时,Spark已升级为2.1版,这2.0以后,Spark大大增强了数据流水线的内容,数据流水线的思路与SKLearn非常相似,我想这种思路或许是未来的一个趋势,使机器学习的流程标准化、规范化、流程化,很多原来需要自己编写代码都有现成的模块或函数,模型评估、调优这些任务也可实现了更高的封装,这大大降低机器学习门槛。

图2-1 Spark机器学习系统的架构图
其中数据处理、建模训练,我们可以进行组装成流水线方式,对模型评估及优化可以采用自动化方式。

2.2启动集群

Spark集群的安装配置,这里不做详细介绍,我们提供了本书可操作云平台,对Spark集群的安装配置感兴趣的读者,可参考由我们编写的《自己动手做大数据系统》。
Spark运行方式有本地模式、集群模式,本地模式所有的处理都运行在同一个JVM中,而后者,可以运行在不同节点上。具体运行方式主要有:
表2-1 Spark运行模式

本书主要以Spark Standalone(独立模式)为例,如果想以其他模式运行,只要改动对应参数即可。
Spark支持Scala或Python的REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式程序编写,交互式编程,输入的代码执行后立即能看到结果,非常友好和方便。
在2.0之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。SparkContext与驱动程序(Driver Program)和集群管理器(Cluster Manager)间的关系如图2-2所示:

图2-2 SparkContext与驱动程序、集群管理器间的关系图

从图中可以看到SparkContext起中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,Driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext。
不过在Spark 2.0中引入SparkSession对象(spark),运行Spark shell则自动创建一个SparkSession对象,在输入spark时就会发现它已经存在了(参考图2.图2-3),SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,它为用户提供了一个统一的切入点,同时也提供了各种DataFrame和Dataset的API,大大降低了学习Spark的难度。

图2-3 启动Spark shell界面

图2-3是启动Spark的集群的界面,编程语言是Scala,如果希望使用Python为编辑语句,该如何启动呢?运行pyspark即可。

图2-4 启动PySpark的客户端

2.3加载数据

这里以MovieLens 100k(http://files.grouplens.org/datasets/movielens/ml-100k.zip)数据集中的用户数据(u.data)为例,首先在本地查看数据的基本信息,然后把本地文件复制到HDFS上,Spark或PySpark读取读取hdfs上的数据。
查看u.user文件的基本信息,数据样例,总记录数等信息。

$ head -3 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
$ cat u.user |wc -l
943

u.user用户数据每列的含义为:user id | age | gender | occupation | zip code,即用户ID,用户年龄,用户性别、用户职位、所在地邮编等信息,列间的分隔符为竖线(|),共有943条记录。
如何把用户信息复制到HDFS上?首先,查看当前HDFS的目录信息。

$ hadoop fs -ls /u01/bigdata/
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2017-02-07 03:20 /u01/bigdata/data
drwxr-xr-x - hadoop supergroup 0 2016-07-20 09:16 /u01/bigdata/hive

由此可知在HDFS已有/u01/bigdata/data目录(如果没有目录可以通过hadoop fs -mkdire /u01/bigdata/data命令创建。),通过以下命令,把本地文件u.user复制到HDFS上。

$ hadoop fs -put u.user /u01/bigdata/data
//查看HDFS上的文件
$ hadoop fs -ls /u01/bigdata/data
-rw-r--r-- 1 hadoop supergroup 22628 2017-03-18 13:37 /u01/bigdata/data/u.user

把电影评级数据(u.data)、电影数据(u.item)等复制到HDFS方法相同,把本地数据复制到HDFS后,Spark如何读取加载HDFS上的文件?我们可以通过Spark的textFile方法读取。这里我们以PySpark为例,启动PySpark客户端,导入需要是的包,然后通过textFile方法读取HDFS上的数据,具体请看以下示例:

###以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的包
from pyspark.sql import SparkSession
from pyspark.sql import Row
##初始化sparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
###加载数据,并处理分割符数据
sc = spark.sparkContext
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
###利用反射机制推断模式(Schema),把dataframe注册为一个table
df = userrdd.map(lambda fields: Row(userid=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")

2.4探索数据

生产环境中数据往往包含很多脏数据,如缺失数据、不一致、不规范、奇异数据等等,所以数据加载后,数据建模前,需要对数据进行分析或探索,尤其面对大数据,了解数据的统计信息、数据质量、数据特征等,为数据处理、数据建模提供重要依据,在进行这些数据分析时,如果能实现数据的可视化,当然更利于我们理解数据。
2.4.1数据统计信息
加载数据后,首先关注的数据的统计信息,有了数据统计信息,我们对数据就有了一个大致了解,如数据特征的最大值、最小值、平均值、分位数、方差等。这些信息有助于我们理解数据质量、数据构成,为数据预处理提供重要依据。
#查看用户各字段的统计信息

schemauser.describe("userid","age", "gender","occupation","zip").show()
+-------+-----------------+-----------------+------+-------------+------------------+
|summary| userid| age|gender| occupation| zip|
+-------+-----------------+-----------------+------+-------------+------------------+
| count| 943| 943| 943| 943| 943|
| mean| 472.0|34.05196182396607| null| null| 50868.78810810811|
| stddev|272.3649512449549|12.19273973305903| null| null|30891.373254138158|
| min| 1| 7| F|administrator| 00000|
| max| 99| 73| M| writer| Y1A6B|
+-------+-----------------+-----------------+------+-------------+------------------+

从以上统计可以看出,用户表总记录数为943条,年龄最小为9岁,最大为73岁,平均年龄为34岁。
2.4.2数据质量分析
数据质量分析是数据探索阶段重要一环,数据不是完美的,大多数据大多包含缺少数据、不一致数据、异常数据、噪音数据等。没有可信的数据,再好的模型性能都太可能好,正所谓“垃圾进,垃圾出”。
数据质量方面的分析,主要包括以下几个方面:
1)缺失值;
2)异常值;
3)不一致的值
4)错误数据
数据集下载
本节以一份某酒店的销售额的数据为例,来说明在数据探索中,对数据质量的一般分析方法,主要涉及缺少值、异常值、不一致数据等。

##以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的库
import pandas as pd
import matplotlib.pyplot as plt
###加载数据,使用标题行
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##查看df的统计信息
df.count() ##统计非空值记录数
sale_date 200
sale_amt 198 ###说明sale_amt有两个空值
df.describe() ###获取df的统计信息
sale_amt
count 198.000000
mean 2765.545152
std 709.557639
min 22.000000
25% 2452.725000
50% 2655.850000
75% 3023.500000
max 9106.440000

#建立图像
plt.figure()
#画箱线图
bp = df.boxplot()
# flies为异常值的标签
x = bp['fliers'][0].get_xdata()
y = bp['fliers'][0].get_ydata()
y.sort()

#用annotate添加注释
for i in range(len(x)):
plt.annotate(y[i], xy = (x[i],y[i]), xytext=(x[i]+0.1-0.8/(y[i]-y[i-1]),y[i]))

plt.show()

图2-5 销售额箱型图检测异常值

从以上分析,可知,销售额列存在两个空值、6个可能的异常值,其中865.0,1060.0有可能属于正常值,当然也需要和也相关业务员沟通,对其他异常值,需要进一步分析异常值产生的原因,然后,确定数据的去留。
2.4.3数据特征分析
对数据质量有基本了解后,接下来就可就数据的特征进行分析,数据特征分析一般包括以下一些内容:
特征分布分析
对比分析
统计量分析
特征一般指用于模型训练的变量,原始数据中特征,有些是数值,有些是字符或其他格式信息,但在进行机器学习前,都需要转换为数值。根据实际情况,有时需要根据已有特征生成或衍生出新特征,如根据用户年龄衍生出表示老、中、青的新特征;有时需要对一些特征进行规范化、标准化等转换,尤其对回归类模型。
2.4.3.11.数据特征分析
特征的分布分析有助于发现相关数据的分布特征、分布类型、分布是否对称等,可以使用数据可视化方法,易直观发现特征的异常值等。以用户信息数据为例,分析用户的年龄特征、职业特征等。

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

sc = spark.sparkContext
# 加载textfile文件并转换为行式
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
#利用反射机制把RDD转换为DataFrame
df = userrdd.map(lambda fields: Row(name=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))

# 把dataframe注册为一个table.
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")

# 在table上运行SQL.
age = spark.sql("SELECT age FROM user")
#把运行结果转换为RDD
ages = age.rdd.map(lambda p: p.age).collect()
hist(ages, bins=20, color='lightblue', normed=True)

图2-6 用户年龄特征分布图

从以上图形可以看出,最小年龄在10岁左右,最大年龄超过70岁,大部分是20岁到40岁之间。
我们还可以进一步分析用户职业分布特征。

# 选取用户职业数据.
count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user Group by occupation order by cnt")
#查看前5行数据
count_occp.show(5)
+----------+---+
|occupation|cnt|
+----------+---+
| homemaker| 7|
| doctor| 7|
| none| 9|
| lawyer| 12|
| salesman| 12|
+----------+---+

#获取职业名称及职业数,以便画出各职业对应总数图形
#把运行结果转换为RDD
x_axis = count_occp.rdd.map(lambda p: p.occupation).collect()
y_axis = count_occp.rdd.map(lambda p: p.cnt).collect()

pos = np.arange(len(x_axis))
width = 1.0
###隐式新增一个figure,或为当前figure新增一个axes
ax = plt.axes()
ax.set_xticks(pos + (width / 2)) ###设置x轴刻度
ax.set_xticklabels(x_axis) ####在对应刻度打上标签

plt.bar(pos, y_axis, width, color='orange')
plt.xticks(rotation=30) ####x轴上的标签旋转30度
fig = matplotlib.pyplot.gcf() ###获取当前figure的应用
fig.set_size_inches(16, 10) ###设置当前figure大小

图2-7 用户职业分布图

从以上用户职业分布图,可以看出,学生占绝大多数,其次其他职业、教育工作者、管理者、工程师等。医生、家庭主妇或许平时较忙,故数量比较少。
2.4.3.22.特征分布及相关性分析
在数据探索阶段,分析特征分布,特征间的相关性等,对应后续的特征选择、特征提取将提供重要依据,以下是对类似共享单车数据的特征分析,详细内容可参考第9章的9.3节

###探索特征间分布、相关性等
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

data1=pd.read_csv('/home/hadoop/data/bike/hour.csv',header=0)
data_pd=data1.toPandas()
sns.set(style='whitegrid',context='notebook')
cols=['temp','atemp','label']
sns.pairplot(data_pd[cols],size=2.5)
plt.show()

图2-8 hours数据集特征分布及相关性示例图
2.4.3.33.对比分析

###导入需要的库
import pandas as pd
###把日期列作为索引,并转换为日期格式
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)
###把空值置为0
df1=df.fillna(0)
###根据年月求和
df_ym=df1.resample('M',how='sum')
##取年月
df2=df_ym.to_period('M')
##数据可视化
df2.plot(kind='bar',rot=30)

图2-9销售月份对比图
2.4.4数据的可视化
数据的可视化是数据探索、数据分析中重要任务,通过可视化可帮助我们发现数据的异常值、特征的分布情况等,为数据预处理提供重要支持。Spark目前对数据的可视化功能还很弱或还没有,不过,没关系,我们可以借助Python或R等可视化功能,Python和R在数据可视化方面功能很强大,这里以Python的数据可视化为例。Python的数据表现能力很强,可以2D或3D等方式展示,视化可以使用matplotlib或plot等方法。matplotlib是一种比较低级但强大的绘图工具,可以进行很多定制化,但往往需要较大代码来实现;Plot是一种非常简洁的绘图工具,它主要基于pandas基础之上,以下我们通过两个示例来具体说明:
下例是通过matplotlib可视化sin(x)和cos(x)函数的图形。

# -*- coding: utf-8 -*-
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif']=['SimHei'] ###显示中文
plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上的-号变为方块
x = np.linspace(0, 10, 100)
y = np.sin(x)
y1 = np.cos(x)
##绘制一个图,长为10,宽为6(默认值是每个单位80像素)
plt.figure(figsize=(10,6))
###在图列中自动显示$间内容
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
plt.plot(x,y1,"b--",label="$cos(x^2)$") ###b(blue),--线形
plt.xlabel(u"X值") ##X坐标名称,u表示unicode编码
plt.ylabel(u"Y值")
plt.title(u"三角函数图像") ##t图名称
plt.ylim(-1.2,1.2) ##y上的max、min值
plt.legend() ##显示图例
plt.savefig('fig01.png') ##保持到当前目录
plt.show()

运行结果如下:

图2-10 matplot数据可视化
同样的这些数据,如果我们对这些数据使用plot来进行可视化,代码可以非常简洁,但定制化方面可能要弱一些。

from pandas import DataFrame
import pandas as pd
import numpy as np

x = np.linspace(0, 10, 100)
df=DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)
df.plot()

显示图形如图2-11所示下:

图2-11 plot数据可视化

从以上实现代码可以看出,如果使用plot则非常简单,虽然定制化要比matplotlib少些,但其可定制的项也不少,如kind,rot,title,legend等等。

2.5数据预处理

前面我们介绍了探索数据的一些方法,通过对数据的探索,可以帮助我们发现一些奇异值、缺失值、一些特征的类别及其分布情况等信息。而这些信息正是对数据预处理的重要依据。在数据分析、机器学习中,数据的预处理是一个非常关键、尤其是涉及大数据的处理,往往是比较费时、费神的一个过程,有时,还需要往返多次。当然,如果数据预处理得好,除提高数据质量外,更能极大提高模型的性能,反之,对模型的影响也是很大,甚至可能垃圾进,垃圾出。
数据的预处理一般包括数据清理、数据转换、数据集成、数据归约等。这些预处理主要内容可以通过以下图形2-12来表示:

 

图2-12数据预处理示意图
2.5.1数据清理
数据清理主要任务是填补缺失值、光滑噪声数据、处理奇异数据、纠正错误数据、删除重复数据、删除唯一性属性、去除不相关字段或特征、处理不一致数据等。噪声数据的处理方法:分箱、聚类等。以下分别以处理缺失数据、异常数据为例,说明在spark中如何处理。
1. 处理缺失值

import pandas as pd
##读取HDFS上的数据
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##定位数据集中的空值
df[df.isnull().values==True]
##显示结果如下,说明有2个空值
sale_date sale_amt
13 2015/2/14 NaN
32 2015/1/26 NaN
###以0填补空值
df.fillna(0)
##或该列的平均值填补空值
df['sale_amt'].fillna(df['sale_amt'].count())
##或用该列前一行值填补空值
df.fillna(method='pad')

2. 处理奇异值
在数据探索阶段,我们发现销售数据文件catering_sale.csv中有6个可能的奇异值,假设与相关人员核实后,只有22为奇异值或错误数据,对错误数据我们一般采用删除或替换的方法,这里我们采用Spark SQL来处理奇异数据。
首先把数据复制到HDFS,用Spark读取数据,如果启动pyspark,则可以通过spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)读取;如果启动spark-shell启动,则可以采用 spark.read.option("header","true").csv("hdfs://192.168.1.112:9000/home
/hadoop/data/catering_sale.csv")的方式读取。

#读取CSV文件,保留文件标题,并创建spark 的一张derby数据库的表
df=spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)
##转换数据类型
df1=df.select(df['sale_date'],df['sale_amt'].cast("Double"))
###假设把22.0奇异值替换为200.0
df1.replace(22.0,200.0,'sale_amt')

这里我们使用了DataFrame的select、replace等方法,实际上df还有很多可利用的方法或函数,可以通过df.+Tab键查看:

这些方法或函数的具体使用,可以通过df.方法名?的方式查看,下例为查看df.filter的详细用法:

此外,我们还可以使用大量spark.sql.functions或pyspark.sql.functions,以下是使用去除字段左右空格、截取字段长度等内置函数示例:

from pyspark.sql.functions import *
###去空格
df.select(trim(df.sale_date)).show()
###去年份
df.select(substring(df.sale_date,1,4).alias('year'),df.sale_amt).show()

2.5.2数据变换
数据变换是数据预处理中一项重要内容,如对数据进行数据的规范化、离散化、衍生指标、类别特征数值化、平滑数据等都属于数据变换。数据变换Spark ML有很多现成的算法,利用这些算法可极大提高整个数据处理的效率,下表2-2只是为一个概况,更多更详细信息请可参考第4章。
表2-2 Spark ML自带的数据变换算法

这里我们以卡方检验为例,如何根据特征的贡献率来选择特征。假设我们很多特征,如:表示时间的特征:季节(season)、年月(yr)、月份(mnth)、是否节假日(holiday)、是否周末(weekday);表示天气的特征weathersit,temp等等,为了使用卡方检验来选择这些特征,首先需要把各特征组合一个特征向量,然后,把整合后特征向量、及选择特征个数等代入卡方模型中,详细代码如下:

//定义特征向量
featuresArray =["season","yr","mnth","hr","holiday","weekday","workingday",\
"weathersit","temp","atemp","hum","windspeed"]

###把各特征组合成特征向量features
assembler = VectorAssembler(inputCols=featuresArray,outputCol="features")
###选择贡献度较大的前5个特征
selectorfeature = ChiSqSelector(numTopFeatures=5, featuresCol="features",outputCol="selectedFeatures", labelCol="label")

2.5.3数据集成
数据集成是数据预处理的重要内容之一,将多文件或者多数据库中的数据进行合并,然后存放在一个一致的数据存储中。数据集成一般通过join或union、merge等关键字把两个(或多个)数据集连接在一起,Spark SQL(包括DataFrame)有join方法,Pandas下有merge方法。数据集成往往需要耗费很多资源,尤其是大数据间的集成涉及到shuffle过程,有时需要牵涉到多个节点,数据集成除了数据一致性外,性能问题常常不请自来,需要我们特别留心。
传统数据库一般是单机上采用hash join方法,如果在分布式环境中,采用join时,可以考虑充分利用分布式资源进行平行化,当然,在进行join之前,对数据过滤或归约也是常用的优化方法。
Spark SQL中有三种join方法:
broadcast hash join:
如果join的表中有一张大表和一张较少的表,可以考虑把这张小表广播分发到另一张 大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。
shuffle hash join:
如果两张表都不小,对数据量较大的表进行广播分发就不太适合。这种情况下,可以根 据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区, 这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。
sort merge join:
如果两张表都比较大,可以考虑使用sort merge join方法,先将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,然后,对单个分区节点的两表数据,分别进行排序,最后,对排好序的两张分区表数据执行join操作。
当然,如果两表都不大,可以直接使用hash join。
DataFrame中join有(或merge):内连接、左连接、右连接等。
2.5.4数据归约
大数据是机器学习的基础,但大数据往往数据量非常大,有时我们可以通过数据归约技术,删除或减少冗余属性(或维)、精简数据集等,使归约后数据比原数据小或小得多,但仍然接近于保持原数据的完整性,并结果与归约前结果相同或几乎相同。
表2-3 Spark ML 自带的数据选择算法


选择特征或降维是机器学习中重要的处理方法,我们可以使用这些方法在减少特征个数、消除噪声等问题的同时,维持原始数据的内在结构或主要特征。尤其是降维,在大数据、机器学习中发挥中重要作用,以下通过两个实例说明SVD、PCA具体使用。目前Spark MLlib支持SVD及PCA。

import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.SingularValueDecomposition import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) //保留前3个奇异值,需要获得U成员 val svd = mat.computeSVD(3, computeU = true) //通过访问svd对象的V、s、U成员分别拿到进行SVD分解后的 //右奇异矩阵、奇异值向量和左奇异矩阵: val U: RowMatrix = svd.U //左奇异矩阵 val s: Vector = svd.s //从大到小的奇异值向量 [30.88197557931219,10.848035248251415,8.201924156089822] val V: Matrix = svd.V //右奇异矩阵 -0.33309047675110115 0.6307611082680837 0.10881297540284612 -0.252559026169606 -0.13320654554805747 0.4862541277385016 -0.3913180354223819 0.3985110846022322 0.20656596253983592 -0.33266751598925126 0.25621153877501424 -0.3575093420454635 -0.35120996186827147 -0.24679309180949208 0.16775460006130793 -0.1811460330545444 0.03808707142157401 -0.46853660508460787 -0.35275045425261 -0.19100365291846758 -0.26646095393100677 -0.2938422406906167 -0.30376401501983874 -0.4274842789454556 -0.44105410502598985 -0.4108875465911952 0.2825275707788212

同样这个矩阵data,以下我们用PCA进行分解,看一下效果及与SVD的异同,SVD分解后右奇异矩阵V与PCA降维后的矩阵pc很相似。

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) val pc: Matrix = mat.computePrincipalComponents(3) -0.3948204553820511 -0.3255749878678745 0.1057375753926894 0.1967741975874508 0.12066915005125914 0.4698636365472036 -0.09206257474269655 -0.407047128194367 0.3210095555021759 0.12315980051885281 -0.6783914405694824 -0.10049065563002131 0.43871546256175087 -0.12704705411702932 0.2775911848440697 -0.05209780173017968 0.10583033338605327 -0.6473697692806737 0.422474587406277 -0.27600606797384 -0.13909137208338707 0.46536643478632944 -0.172268807944553 -0.349731653791416 0.4376262507870099 0.3469015236606571 0.13076351966313637

使用PCA降维,利用pyspark的画图功能,可以新生成的特征的方差贡献度进行可视化,下图为对hour.csv数据,通过PCA处理后,重要特征的排序情况:

图2-13 hour.csv数据的PCA分析图

2.6构建模型

前面我们介绍了准备阶段,包括加载数据、探索数据、预处理数据等,数据准备阶段往往是最费时间和精力的,常常这个问题解决了,又会出现新问题,经常需要返回多次。一般而言数据准备阶段从时间上来说可能要占据60%左右,有时更多。 数据准备后以后,接下来就是构建模型,模型是机器学习、数据挖掘等的核心,构建模型涉及确定模型或算法、设置参数、运算模型等,其大致流程如图2.11-14所示。

图2.14构建模型流程

选择算法主要依据业务需求、数据特征等,Spark目前支持分类、回归、推荐等这些常用而且重要的算法,具体可参考表2.3 所示。一种类型往往有几种算法,如分类可以逻辑、决策树等,如何选择算法,需要考虑业务需求、数据特征、算法适应性、个人经验等,当然,也可选择几种方法,然后进行比较,或采用集成学习的方式,复合多种算法也是选项之一,如先采用聚类方法对数据进行聚类,然后对不同类别的数据进行预测或推荐,有时会得到更好的结果。如果你觉得选择比较难或还不好确定,可以先从简单或熟悉的方法开始,然后,不断完善和优化。

表2-3 Spark ML 目前支持的算法

确定算法后,一般还需要设置一些参数,如训练决策树时需要选择迭代次数、纯度计算方法、树的最大高度等,此外,对准备好的数据需要进行划分,一般划分为训练数据和测试数据,有的会把训练数据进一步划分为训练数据集、验证数据集。Spark 提供多种随机划分数据的方法,如randomSplit、CrossValidator等。这些方法的具体使用在2.8节模型调优中将会具体说明。 训练数据用于训练模型,测试数据用于验证模型,因这个环节的验证是在模型训练过程中验证,所以它一般也认为隶属于模型建立过程。这种验证方法一般称为交叉验证(CrossValidator,CV),有些交叉验证把数据分成K组,如K折交叉验证(K-fold Cross Validator,K-CV ),在K折-交叉验证中,采用不重复地随机将数据集划分为K对,如果K=3,则将产生3个(训练,测试)数据集对,每个数据集使用2/3的数据进行训练,1/3进行测试。,这样会得到3个模型,用这3个模型的平均数作为最终模型的性能指标。K-CV可以有效的避免欠学习状态的发生,其结果也比较具有说服性。

2.7模型评估

模型构建以后,接下来就需要对该模型的性能、与目标的切合度等进行一些评估,模型评估是模型开发过程的不可或缺的一部分。在构建模型的过程中,会产生一些评估指标,如精确度、ROC、RMSE等等,这这些指标是重要而且基础的,但应该不是唯一和最终指标,除了这些指标外,我们还应该评估模型对业务的提示或商业目标的达成等方面贡献。一个好的模型不但要有好的技术指标,更要为解决实际问题提供帮助,有时后者显得更为重要。 Spark中常用的几个评估算法有: 均方差(MSE,Mean Squared Error): (∑(prec-act)**2)/n(prec为预测值,act为实际值,n为总样本数) 均方根差(RMSE,Root Mean Squared Error): 就是MSE开根号 平均绝对值误差(MAE,Mean Absolute Error): (∑|prec-act|)/n 在了解正确率、准确率之前,我们先看一个所谓的混淆矩阵(confusion matrix):

 

2.15混淆矩阵

混淆矩阵是一个简单矩阵,用于展示一个二分类器的预测结果,其中,T-True、F-False、N-Negative、P-Postitive。 真正(TP)被模型预测为正的正样本数;可以称作判断为真的正确率; 真负(TN) 被模型预测为负的负样本数;可以称作判断为假的正确率; 假正(FP) 被模型预测为正的负样本数;可以称作误报率; 假负(FN) 被模型预测为负的正样本数;可以称作漏报率. 正确率(Accuracy): A = (TP + TN)/(P+N) = (TP + TN)/(TP + FN + FP + TN) 反映了分类器统对整个样本的判定能力——能将正的判定为正,负的判定为负。 错误率(Error): E= (FP + FN)/(P+N) = (FP + FN)/(TP + FN + FP + TN) 准确率(Precision) P = TP/(TP+FP) ; 反映了被分类器判定的正例中真正的正例样本的比重 召回率(Recall): R = TP/(TP+FN) = 1 - FN/T; 反映了被正确判定的正例占总的正例的比重 F1-Measure: F1=2P*R/(P+R) 真阳性率(TPR): TPR= TP/(TP+FN),代表分类器预测的正类中实际正实例占所有正实例的比例。 假阳性率(FPR): FPR= FP/(FP+TN),代表分类器预测的正类中实际负实例占所有负实例的比例。 以上这些都属于静态的指标,当正负样本不平衡时它会存在着严重的问题。极端情况下比如正负样本比例为1:99(有些领域并不少见),那么一个分类器只要把所有样本都判为负,它就拥有了99%的精确度,但这时的评价指标是不具有参考价值的。另外,很多分类器都不是简单地给出一个正或负(0或1)的分类判定,而是给出一个分类的倾向程度,比如贝叶斯分类器输出的分类概率。对于这些分类器,当你取不同阈值,就可以得到不同的分类结果及分类器评价指标,依此人们又发明出来ROC曲线以及AUC(ROC曲线包围面积)指标来衡量分类器的总体可信度。ROC曲线将FPR和TPR定义为x和y轴,这样就描述了真阳性和假阳性不同决策阈值下之间的关系。AUC越大说明模型性能越好,ROC曲线如下图:

图2-16 ROC曲线示意图 下面通过一个实例说明Spark一些评估指标的使用:

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.evaluation.RegressionMetrics val path="file:///u01/bigdata/spark/data/mllib/sample_libsvm_data.txt" val data=spark.read.format("libsvm").load(path) val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L) //参数说明 // threshold变量用来控制分类的阈值,默认值为0.5 val lr = new LogisticRegression() .setThreshold(0.6).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val lrModel = lr.fit(trainingData) val predictions = lrModel.transform(testData) predictions.show() //计算MSE、MAE、 RMSE等 val evaluator = new BinaryClassificationEvaluator() .setLabelCol("label") val accuracy = evaluator.evaluate(predictions) val rm2 = new RegressionMetrics(predictions.select("prediction", "label").rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
println("MSE: " + rm2.meanSquaredError)
println("MAE: " + rm2.meanAbsoluteError)
println("RMSE Squared: " + rm2.rootMeanSquaredError)

//将其作为多分类结果进行评估,可计算F1、准确率、召回率、正确率
val multiclassClassificationEvaluator = new MulticlassClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + multiclassClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}

printlnMetric("f1")//f1 = 0.9646258503401359
printlnMetric("weightedPrecision")//weightedPrecision = 0.9675324675324675
printlnMetric("weightedRecall")//weightedRecall = 0.9642857142857142
printlnMetric("accuracy")//accuracy = 0.9642857142857143

//将其作为二分类结果进行评估,可计算areaUnderROC、areaUnderPR
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + binaryClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}

printlnMetric("areaUnderROC") //结果为areaUnderROC = 0.9944444444444444
printlnMetric("areaUnderPR")//结果为areaUnderPR = 0.9969948018193632
//分类正确且分类为1的样本数量 TP 是17
predictions.filter($"label" === $"prediction").filter($"label"===1).count
//分类正确且分类为0的样本数量 TN 是10
predictions.filter($"label" === $"prediction").filter($"label"===0).count
//分类错误且分类为0的样本数量 FN是1
predictions.filter($"label" !== $"prediction").filter($"prediction"===0).count
//分类错误且分类为1的样本数量 FP是0
predictions.filter($"label" !== $"prediction").filter($"prediction"===1).count

准确率:TP/(TP+FP)=17/(17+0)=1
召回率:TP/(TP+FN) = 17/(17+1)=0.944444

2.8组装

我们对数据集进行了探索,之后进行大量的数据清理、转换等工作,对数据预处理后,构建模型、评估模型。评估模型前我们需要对数据集随机划分为训练集和测试集。假如数据有变化,如新增数据,如何保证训练集和测试集上的操作保持一致?如果数据清理、数据转换等有很多步骤,如何保证这些步骤依次执行?
采用Spark pipeline能很好解决这些问题。我们只要把这些任务,作为pipeline的stage,按照其本身的执行次序把这些stages组装到一个pipeline上。(当然如果任务比较复杂,我们也可以采用多个pipeline,然后把这些作为pipeline的stage,组装到一个新的pipeline。)
组装的步骤一般是:
1、创建pipeline,并各个stages依次组装在一起,如:

val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

2、在训练集上拟合这个pipeline

val model = pipeline.fit(training)

3、在测试集上,做预测。

model.transform(test).select("label", "prediction")

通过这种方式,既可保证stages有序执行,也可保证在训练集和测试集上所做逻辑操作的一致性,这里只是举了一个简单例子,下一章将详细介绍有关Pipeline的内容,第7章后,还有详细的使用实例。

2.9模型选择或调优

在ML中一个重要的任务就是模型选择,或者使用给定的数据为给定的任务寻找最适合的模型或参数。这个过程也叫做调优。调优可以是对单个的Estimator,比如LogisticRegression,或者是包含多个算法、特征工程和其他步骤的工作流(Pipeline)中完成。用户可以一次性对整个Pipline进行调优,而不必对Pipline中的每一个元素进行单独的调优。
MLlib支持使用像CrossValidator和TrainValidationSplit这样的工具进行模型选择。这些工具需要以下的组件:
Estimator:用户调优的算法或Pipline。
ParamMap集合:提供参数选择,有时也叫作用户查找的参数网格(parameter grid),参数网格可以使用ParamGridBuilder来构建。
Evaluator:衡量模型在测试数据上的拟合程度。
模型选择工具工作原理如下:
1.将输入数据划分为训练数据和测试数据。
2. 对于每个(训练,测试)对,遍历一组ParamMaps。用每一个ParamMap参数来拟合估计器,得到训练后的模型,再使用评估器来评估模型表现。
3.选择性能表现最优模型对应参数表。
2.9.1 交叉验证 (CrossValidator)
交叉验证(CrossValidator)会从将数据集切分成K折数据集合,分别用于训练和测试,。例如,K=3折时,CrossValidator会生成3个(训练数据,测试数据)对,每一个数据对的训练数据占2/3,测试数据占1/3。为了评估一个ParamMap,CrossValidator 会计算这三个不同的(训练,测试)数据集对在Estimator拟合出的模型上的平均评估指标。
在找出最好的ParamMap后,CrossValidator 会利用此ParamMap在整个训练集上可以训练(fit)出一个泛化能力强,误差相对小的的最佳模型,整个过程处于流程化管理之中,其工作流程图如下:

 

图2-17Spark CrossValidator流程图

虽然利用CrossValidator来训练模型,可以提升泛化能力,但其的代价也比较高,如选择k=3,regParam=(0.1,0.01),numIters=(10,20)这样就需要对模型训练3*2*2=12次。然而,对比启发式的手动调优,这是选择参数的行之有效的方法。
2.9.2训练-验证切分(TrainValidationSplit)
交叉验证的代价比较高昂,为此Spark也为超参数调优提供了训练-验证切分(TrainValidationSplit)。TrainValidationSplit创建单一的(训练,测试)数据集对。它使用trainRatio参数将数据集切分成两部分。例如,当设置trainRatio=0.8时,TrainValidationSplit将会将数据切分80%作为数据集,20%作为验证集,来生成训练、测试集对,并最终使用最好的ParamMap和完整的数据集来拟合评估器。
相对于CrossValidator对每一个参数进行k次评估,TrainValidationSplit只对每个参数组合评估1次。因此它的评估代价没有这那么高,但是当训练数据集不够大的时候其结果相对不够可信。

图2-18 Spark TrainValidationSplit流程图

2.10保存模型

训练、优化模型后,我们需要保存模型,然后把模型移植或部署到其他环境中。
这节主要介绍如何保存模型,如何部署模型等内容,以下是具体示例代码。
1)保存拟合后流水线(pipeline)到磁盘

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

2)保存未拟合的流水线(pipeline)到磁盘

pipeline.write.overwrite().save("/tmp/spark-logistic-regression-model")

3)把拟合后流水线部署到其他环境中。

val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

2.11小结

这一章主要介绍了如何构建Spark学习系统、构建的一般步骤等。,实际上,构建Spark学习系统与我们构建其他平台的学习系统基本相同或相似,一般都包括数据加载、数据探索、数据预测、建模、训练模型、评估模型、优化模型等步骤,但这里我们特别增加一个利用pipeline组装各个任务(stages),这也是Spark ML中基于DataFrame数据集的重要内容,下一章我们将详细介绍有关pipeline的内容。