设为首页 加入收藏

TOP

FunDA(7)- Reactive Streams to fs2 Pull Streams(一)
2017-10-09 14:30:09 】 浏览:5832
Tags:FunDA Reactive Streams fs2 Pull

    Reactive-Stream不只是简单的push-model-stream, 它还带有“拖式”(pull-model)性质。这是因为在Iteratee模式里虽然理论上由Enumerator负责主动推送数据,实现了push-model功能。但实际上Iteratee也会根据自身情况,通过提供callback函数通知Enumerator可以开始推送数据,这从某种程度上也算是一种pull-model。换句话讲Reactive-Streams是通过push-pull-model来实现上下游Enumerator和Iteratee之间互动的。我们先看个简单的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont { case Input.El(e) => println(s"EL($e)") showElements case Input.Empty => showElements case Input.EOF => println("EOF") Done((),Input.EOF) } //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]
val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473
 enumNumbers |>> showElements                      //> EL(1) //| EL(2) //| EL(3) //| EL(4) //| EL(5) //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))

我们看到:enumNumbers |>> showElements立刻启动了运算。但并没有实际完成数据发送,因为showElements并没有收到Input.EOF。首先,我们必须用Iteratee.run来完成运算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1) //| El(2) //| El(3) //| El(4) //| El(5) //| El(6) //| El(7) //| El(8) //| EOF //| it : scala.concurrent.Future[Int] = Success(99)

这个run函数是这样定义的:

/** * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary * Extracts the computed result of the Iteratee, pushing an Input.EOF first * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state. * In case of error, an exception may be thrown synchronously or may * be used to complete the returned Promise; this indeterminate behavior * is inherited from fold(). * * @return a [[scala.concurrent.Future]] of the eventually computed result */ def run: Future[A] = fold({ case Step.Done(a, _) => Future.successful(a) case Step.Cont(k) => k(Input.EOF).fold({ case Step.Done(a1, _) => Future.successful(a1) case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF") case Step.Error(msg, e) => sys.error(msg) })(dec) case Step.Error(msg, e) => sys.error(msg) })(dec)

再一个问题是:enumNumbers |>> showElements是个封闭的运算,我们无法逐部分截取数据流,只能取得整个运算结果。也就是说如果我们希望把一个Enumerator产生的数据引导到fs2 Stream的话,只能在所有数据都读入内存后才能实现了。这样就违背了使用Reactive-Streams的意愿。那我们应该怎么办?一个可行的方法是使用一个存储数据结构,用两个线程,一个线程里Iteratee把当前数据存入数据结构,另一个线程里fs2把数据取出来。fs2.async.mutable包提供了个Queue类型,我们可以用这个Queue结构来作为Iteratee与fs2之间的管道:Iteratee从一头把数据压进去(enqueue),fs2从另一头把数据取出来(dequeue)。

我们先设计enqueue部分,这部分是在Iteratee里进行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont { case Input.EOF => q.enqueue1(None).unsafeRun Done((),Input.EOF) case Input.Empty => enqueueTofs2(q) case Input.El(e) => q.enqueue1(Some(e)).unsafeRun enqueueTofs2(q) } //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下这个Iteratee:我们直接把enqueueTofs2放入Cont状态,也就是等待接受数据状态。当收到数据时运行q.enqueue1把数据塞入q,然后不断循环运行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是个同步运算,在未成功完成数据enqueue1的情况下会一直占用线程。所以,q另一端的dequeue部分必须是在另一个线程里运行,否则会造成整个程序的死锁。fs2的Queue类型款式是:Queue[F,A],所以我们必须用Stream.eva l来对这个Queue进行函数式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eva l(async.boundedQueue[Task,Option[
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(6)- Reactive Streams:.. 下一篇FunDA(8)- Static Source:保..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目