Skip to content

Latest commit

 

History

History
150 lines (125 loc) · 11.5 KB

1.2.md

File metadata and controls

150 lines (125 loc) · 11.5 KB

#1.2 快速入门指南: Reactive Tweets 一个典型的流处理用例是在正在消费的实时流或者数据中提取汇总一些其他数据。在这个例子中我们打算消费一个tweets流,并提取其中关于akka的信息。我们也考虑固有的所有无阻塞流媒体解决方案的问题:“订阅者消耗数据缓慢的实时流如何工作”,传统上的解决方法是经常缓存元素,但这种将会造成最终缓冲区的溢出和系统的不稳定。相反,akka stream依赖于内部的backpressure信号来控制着这种状态下被允许的情况。以下是我们整个快速入门中的数据模型

final case class Author(handle: String)
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
val akka = Hashtag("#akka")

注意:如果你想要获得一个使用词汇的概要而不是冒失的深入一个实际的例子,你可以看看核心概念、定义和运行streams部分的文档,然后再回到这里连贯的看这个简单的示例应用程序。


##1.2.1 转换和消费简单的流(Transforming and consuming simple streams) 在这个例子中我们将关注从一个简单的Twitter源的流中提取某些信息,例如在所有推文的句柄中关于#akka的用户。通过创建一个ActorSystem和一个ActorMaterlizer来准备我们的环境,这将负责物化和运行我们将要创建的流:

implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()

ActorMaterializer可以任意选取将采取那些将被定义物化性质的ActorMaterializerSettings,如默认缓冲区的大小(参考Buffers in Akka Streams),通过管道等的调度。这些能在FlowSourceSinkGraph中被覆盖。 假如我们有一应俱全的一个tweets流,在akka这将被表示为Source[Out,M]:

val tweets: Source[Tweet, Unit]

流(Streams)总是从Source[Out,M1]开始,可以通过Flow[In,Out,M2]元素或者更高级的(Graph)元素,最终由Sink[In,M3]消耗(现在忽略M1、M2、M3的参数类型,他们是不相关的类型生产/消耗--他们是“物化类型”,我们将在下面讨论) 这些操作对于那些使用scala集合库的人应该很熟悉,只不过这里是作用在流上而不是集合上(这是一个非常重要的区别,因为某些操作在流媒体上才有意义,反之亦然)

val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)

最后,为了物化并运行流计算,我们需要将Flow连接到Sink,这样将使得flow运行。最简单的方法是在Source上调用runWith(sink)方法。为了方便起见,若干普通Sink通过Sink的伴生对象的方法将会是预定义的和可采集的。现在我们只需简单的打印作者:

authors.runWith(Sink.foreach(println))

或者使用更简单的版本(这仅仅正对最流行的sinks如Sink.fold或者Sink.foreach):

authors.runForeach(println)

物化和运行一个流总是需要Materializer是隐式定义的(或者通过明确定义,类似:run(materializer))。 完整的代码是这样的:

implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()
val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
authors.runWith(Sink.foreach(println))

##1.2.2 流中的扁平化序列(Flattening sequences in streams) 在上一节,我们是处理最常见的一对一关系的元素,但是有时我们需要从一个元素映射到多个元素,并获取“扁平化”流,也是和scala集合中的flatMap类似,为了从我们的推文流中获取hashtags(#标签),w我们可以使用mapConcat组合子:

val hashtags: Source[Hashtag, Unit] = tweets.mapConcat(_.hashtags.toList)

注意:flatMap这个名字由于和for-comprehensions以及一元monadic接近而被有意识的回避。这是有问题的,原因有两点,第一,由于死锁的风险(与合并作为首选策略),采用连接操作的扁平化是有界的而不希望以流处理,二是monad将不允许我们采取flatMap的定律(由于活跃性的问题)。请注意:mapConcat所提供功能以返回一个严格的集合 (f:Out=>immutable:Seq[T]),而flatMap必须对数据流采取统一的操作


##1.2.3 广播流(Broadcasting a stream) 现在,我们要从这个实时流中获取所有的hashtags以及所有的作者,例如,我们希望所有的作者写到一个文件,而hashtags写到另一个文件,这意味着我们需要将源数据分成两个流,它将处理不同写入到不同的文件。能被用于像"fan-out"(或者"fan-in")结构的元素在akka streams中被成为节点("junctions")。我们将在这个例子中的其中一个称为广播,它只是从他的输入端口发射元素到它所有的输出端口。akka Stream有意的从非线性结构分离线性结构(流(Flows)),以为这两种情况提供最方便的API,图(Graphs)可以描述在非读情况下任意复杂情况下的集合转化的流设置,图用GraphDSL构成:

val writeAuthors: Sink[Author, Unit] = ???
val writeHashtags: Sink[Hashtag, Unit] = ???
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
ClosedShape
})
g.run()

正如你所看到,在GraphDSL中,我们使用一个隐式的图生成器b通过~>(边操作者)(也读作 “connect” 或者 “via” 或者 “to”)无常的构造图,操作者是通过导入GraphDSL.Implicits._隐含提供。 GraphDSL.create返回一个图形,在这个例子中的Graph[ClosedShape,Unit]中的ClosedShape意味着它是一个完全连通图或者关闭-没有任何未连接的输入或者输出。因为它是封闭的,额可以使用RunnableGraph.fromGraph将上述图转化为RunnableGraph。可运行的图能调用run()来实现输出。 GraphRunnableGraph都是不变的,线程安全的,自由共享的。 一个图还可以具有其他形状,与一个或多个尚未连接端口。拥有尚未连接的端口表示这个图是局部图。大型系统构成和嵌套图将被在模块化、组成和层次上详细的解释说明。另外,也可以从局部流作为FlowsSinks或者Sources包装复杂的计算图形也将在详细的在构建SourcesSinksFlows中解释。

##1.2.4 Back-pressure 实战(Back-pressure in action) akka Stream的主要优点之一是它们总是从stream Sinks(订阅者)Sources(生产者)传递back-pressure信息。这不是一个可选的特征,并在任何时候都被启用。详细的了解在akka Stream和其他所有的兼容Reactive Stream中执行的back-pressure协议请阅读Back-pressure explained.。 一个典型问题的程序(不使用akka stream)是这样的:他们暂时或者设计上无法处理足够快的输入数据,并开始缓存传入的数据,直到没有更多空间来缓存,导致无论是OutOfMemoryErros或者服务的响应能力严重的退化。Akka Stream可以而且必须显式的处理缓冲。例如,如果我们只对最近的十条推文感兴趣,用一个缓存10个元素的缓冲器,这可以通过buffer来表示元素:

tweets
.buffer(10, OverflowStrategy.dropHead)
.map(slowComputation)
.runWith(Sink.ignore)

缓冲元素需要一个名曲的和所需的OverflowStrategy,这定义缓冲器在收到新元素是的策略,提供的策略包括丢弃最旧的元素(dropHead),废弃整个缓冲区,信号错误等。确信你选择最适合你情况的策略。

##1.2.5 物化值(Materialized values) 到目前为止,我们只处理那些用流(Flows)并且通过某种外部的接收器(Sinks)处理数据,无论通过打印值还是存储在外部系统中。但有时候我们可能会对那些可以从物化处理流水线上获取的值感兴趣。比如我们想知道有多少推文我们已经处理。虽然在无限流的情况下这个答案不是那明显的给出。(这个问题回答的一种方式是“到目前为止,我们处理了N条推文”),但通常可以处理有限流,并拿出一个不错的记过,例如,元素的总数。首先我们用Sink.fold写一个这样的元素计数器,看看类型是什么样子的:

val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
tweets
    .via(count)
    .toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total tweets processed: $c"))

首先,我们编写一个可复用的流,将每个传入的推文转化为数字1,我们将使能将所有Int元素求和,并将形成一个Future[Int]的Sink.fold将那些1组合成那些1的和。接下来通过可以将每一个tweet转化成1的map步骤连接推文流,最终我们通过toMat连接事先定义的Sink和流量。 还记得那些在Source[+Out,+Mat]Flow[-In,+Out,+Mat]以及Sink[-Int,+Mat]上的神秘的Mat类型的元素么?他们标识当物化结束时这部分的返回值。当你链接这些时,你可以明确的结合自己的物化值:在我们的例子中,我们使用预定义的Keep.right函数,这表明这阶段只关心物化类型,正如你看到的,sumSink的物化类型是Future[Int,因此使用Keep.right,由此产生的RunnableGraph也是有Future[Int]的类型参数。 这一步还没有一个物化的处理管道,只是准备说明Flow被连接到一个Sink,因此可以使用run(),它的类型是: RunnableGraph[Future[Int]]。接下来我们调用run(),这使用一个隐式的ActorMaterializer实现和运行住哪户,通过调用RunnableGraph[T]run()方法返回值的类型是T。我们这个例子中,返回值的类型是Future[Int],一旦完成,将包含我们推文的忠诚度,如果流是失败,那么这个会以Failure完成。 一个RunnableGraph`可以重复使用和物化多次,因为这是一个“蓝图”。这意味着,如果我们的物化流,例如,一个消费推文的实时流,在一分钟后,这两个的物化值是不同的:

val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableGraph: RunnableGraph[Future[Int]] =
tweetsInMinuteFromNow
    .filter(_.hashtags contains akka)
    .map(t => 1)
    .toMat(sumSink)(Keep.right)
// materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
// and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableGraph.run()

在akka stream中许多元素提供可以获取结果,计算,或者转向这些元素的物化值,这将在物化流中被详细的讨论。通过这一节,我们知道幕后发生的事,当我们运行这个one-liner,这相当于上述多行的版本:

val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)

注意 : runWith()是个方便的方法,可以自动忽略除了那些由runWith()本身追加的之外的其他阶段的任意物化值,在上述例子中通过使用Keep.right作为组合物化值。