文章目录
第13章 使用Spark Streaming构建在线学习模型
前面我们介绍的这些算法,一般基于一个或几个相对固定的文件,以这样的数据为模型处理的源数据是固定的,这样的数据或许很大,很多。训练或测试都是建立在这些固定数据之上,当然,测试时,可能取这个数据源之外的数据,如新数据或其他数据等。训练模型的数据一般是相对固定的,这样的机器学习的场景是很普遍的。
但实际环境中,还有其他一些场景,如源数据是经常变换,就像流水一样,时刻在变换着,如很多在线数据、很多日志数据等等。面对这些数据的学习我们该如何处理呢?
这个问题实际上属于流水计算问题,目前解决这类问题有Spark Streaming、Storm、Samza等。这章我们主要介绍Spark Streaming。
本章主要包括以下内容:
介绍Spark Streaming主要内容
Spark Streaming入门实例
在线学习实例
13.1 Spark Streaming简介
Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。
13.1.1Spark Streaming常用术语
在简介Spark Streaming前,我们先简单介绍Streaming的一些常用术语。
13.1.2Spark Streaming处理流程
Spark Streaming处理的数据流图如图13-1所示。
图13-1 Spark Streaming计算过程
13.2 Dstream操作
RDD有很多操作和转换,与RDD类似,DStream也提供了自己的一系列操作方法,本节主要介绍如何操作DStream,包括输入、转换、修改状态及输出等。
13.2.1 Dstream输入
在Spark Streaming中所有的操作都是基于流的,而输入源是一切操作的起点。
Spark Streaming 提供两种类型的流式输入数据源:
基础输入源:能直接应用于StreamingContext API输入源。例如:文件系统、Socket(套接字)连接和 Akka actors;
高级输入源:能应用于特定工具类的输入源,如 Kafka、Flume、Kinesis、Twitter 等,使用这些输入源需要导入一些额外依赖包。
13.2.2 Dstream转换
DStream转换操作是在一个或多个DStream上创建新的DStream。
13.2.3 Dstream修改
Spark Streaming除提供一些基本操作,还提供一些状态操作。
13.2 .4Dstream输出
Spark Streaming允许DStream的数据输出到外部系统,如数据库、文件系统等。
13.3 Spark Streaming应用实例
先启动nc,端口为9999
1 |
nc -lk 9999 |
然后,以本地方式启动spark shell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
//导入类或包 import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import spark.implicits._ // 创建一个间隔时间为3秒的context val ssc = new StreamingContext(sc, Seconds(3)) // 创建一个socket stream ,基于master:9999 val lines = ssc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) //为便于使用SQL进行统计,把DStream的RDD转换为DataFrame。 // 把RDD[String] 转换为RDD[case class] ,最后转换为DataFrame case class Record(word: String) words.foreachRDD { (rdd:RDD[String], time:Time) => val wordsDataFrame = rdd.map(w => Record(w)).toDF() // 创建一个临时视图 wordsDataFrame.createOrReplaceTempView("words") //使用SQL进行统计 val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() |
在启动了nc的界面输入:
1 |
ok ok m p py py py |
在spark shell界面,可以看到如下输出:
1 2 3 4 5 6 7 8 9 |
========= 1494714360000 ms ========= +----+-----+ |word|total| +----+-----+ | m| 1| | ok| 2| | p| 1| | py| 3| +----+-----+ |
13.4 Spark Streaming在线学习实例
前面我们简单介绍一个利用nc产生文本数据,Spark Streaming实时统计词频的一个实例,通过这个例子,我们对Streaming有个大致了解,它的源数据可以是实时产生、实时变化的,基于这个数据流,Spark Streaming能实时进行统计词频信息,并输出到界面。
除了统计词频,实际上Spark Streaming 还可以做在线机器学习工作,目前Spark Streaming支持Streaming Linear Regression, Streaming KMeans等,这节我们模拟一个在线学习线性回归的算法,源数据为多个文件,首先在一个文件中训练模型,然后在新数据上进行调整模型,对新数据进行预测等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
//导入需要的类 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.StandardScaler import breeze.linalg.DenseVector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //交互式编程 val ssc = new StreamingContext(sc, Seconds(10)) val stream = ssc.textFileStream("file:///home/hadoop/data/streaming/traindir") val NumFeatures = 11 val zeroVector = DenseVector.zeros[Double](NumFeatures) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(zeroVector.data)) .setNumIterations(20) .setRegParam(0.8) .setStepSize(0.01) //创建一个含标签的数据流 val labeledStream = stream.map { line => val split = line.split(";") val y = split(11).toDouble val features=split.slice(0,11).map(_.toDouble) LabeledPoint(label = y, features = Vectors.dense(features)) } //在数据流上训练测试模型。 model.trainOn(labeledStream) model.predictOnValues(labeledStream.map(lp => (lp.label, lp.features))) .print() //启动Spark Streaming ssc.start() ssc.awaitTermination() |
13.5小结
前几章主要介绍了Spark ML对批量数据或离线数据的分析和处理,本章主要介绍Spark Streamin对在线数据或流式数据的处理及分析,首先对Spark Streaming的一些概念、输入源、Dstream的一些转换、修改、输出作了简单介绍,然后,通过两个实例把这些内容结合在一起,进一步说明Spark Streaming在线统计、在线学习的具体使用。