怎样利用Spark Streaming和Hadoop实现近实时的会话连接

这个 Spark Streaming 样例是怎样将近实时会话带到到Hadoop中的一个很好的例子。

Spark Streaming 是Apache Spark 中最有趣的组件之一。利用Spark Streaming,你可以通过使用与处理批量加载数据相同的API来创建数据管道,并通过数据管道处理流式数据。此外,Spark Steaming的“micro-batching”方式提供相当好的弹性来应对某些原因造成的任务失败。

在这篇文章中,我将通过演示网站事件的近实时会话使你熟悉一些常用的以及高级的Spark Streaming功能,然后在Apache HBase中加载事件的有关统计数据,最后选择你喜欢的BI工具进行绘图分析。 (Sessionization指的是捕获单一访问者的网站会话时间范围内所有的点击流事件。)你可以在这里找到了演示的代码。

像这样的系统对于了解访问者的行为(不管是人还是机器)是超级有用的。通过一些额外的工作,它也可以被设计用来包含windowing模式,以此来用异步的方式发现可能的欺骗。

Spark Streaming 代码

我们例子中的main class是:

让我们来看看这段代码段(忽略1-59行,它包含了imports 和其他无聊的东西)。

60到112行:设置Spark Streaming
这些代码是Spark Streaming最基本的开始,并可以选择从HDFS或socket接收数据流。如果你是Spark Streaming新手,我已经添加了一些详细的注释帮助理解代码。 (我不打算在这里详谈,因为此时我们仍要讨论代码。)

114到124行: 字符串解析

这里是Spark Streaming开始的地方. 请看一下几行::

上面第一行代码是对“lines” DStream对象做了一个map函数,并解析原始事件分离出IP地址,时间戳和事件体。对于那些新进入Spark Streaming的人而言,一个DSTREAM保存着要处理的一批记录。这些记录由以前所定义的receiver对象填充,并且此map函数在这个micro-batch内产生另一个DSTREAM存储变换后的记录来进行额外的处理。

当看像上面的Spark Streaming示意图时,有一些事情要注意:

  • 每个micro-batch在你构造StreamingContext时设定的那一秒时会被销毁
  • Receiver总是被下一个micro-batch中的即将到来的RDDS填充
  • 之前micro batch中旧的RDDs将被清理丢弃

126到135行:产生Sessions

现在,我们有从网络日志中获得的IP地址和时间,是时候建立sessions了。下面的代码是通过micro-batch内的第一聚集事件建立session,然后在有状态的DStream中减少这些会话的块。

这里有一个关于records如何在micro-batch中被reduce的例子:

会话范围内的 micro-batch 内加入,我们可以用超酷的updateStateByKey功能,它可以利用先前活跃的micro-batch中的一个DStream做join/reduce-like操作。下图详细说明了,就DStreams而言,随着时间变化这个处理过程是怎样的。

现在,让我们深入到定义在文件的底部的updateStatbyOfSessions函数。该代码(注意详细注释)很神奇,能使会话流程以连续的微批处理模式发生。

这段代码通过很多方式做了很多事,这是整个工作中最复杂的部分。总之,它会跟踪活动的会话,所以你会知道你是继续现有的会话还是启动一个新的。

126到207行:计数和HBase

这部分做了大量的计数工作。在这里有很多是重复的,所以让我们只看一个count的例子,然后我们再一步步地把在相同记录中产生的counts存储在HBase中。

总之,上面的代码是过滤了除活动会话之外的其他所有会话,对他们进行计数,并把最终的计数记录存储到一个的HashMap实例中。它使用HashMap作为容器,所以在所有的count做完后,我们可以调用下面的reduce函数把他们都合并为一个单独的记录。 (我敢肯定还有更好的方法来实现这一点,但这种方法也做的不错。)

接下来,下面的代码处理所有的HashMap,并把他们所有的值存储在在一个HashMap中。

Spark Streaming与HBase利用HBaseContext进行交互非常简单。所有你需要做的就是用HashMap和函数将其转换为一个put对象提供给DSTREAM。

现在,HBase的这些信息可以用Apache Hive table打包,然后通过你喜欢的BI工具执行一个查询来获取像下面这样的图,每一次micro-batch该图都将刷新。

209到215行:写入HDFS
最后的任务是把事件数据加入到活动的会话信息中,然后把事件以会话的开始时间保存到HDFS。

结论

我希望你跳出这个例子,能感受到只用少量的代码就完成了大量的工作,因为它就是这样的。想象一下你还可以用这种模式和Spark Streaming与HBase HDFS很容易交互的能力去做什么东西。

Ted Malaska 是 Cloudera 的一个解决方案架构师,Apache Spark的一个贡献者,O’Reilly的书 Hadoop Applications Architecture 的合著者。

3 收藏 评论

关于作者:mtunique

微博:@孟涛_hustGithub:mtunique个人网站:mtunique.com 个人主页 · 我的文章 · 12

相关文章

可能感兴趣的话题



直接登录
跳到底部
返回顶部