设为首页 加入收藏

TOP

Scalaz(48)- scalaz-stream: 深入了解-Transducer: Process1-tee-wye(一)
2017-10-10 12:12:34 】 浏览:4914
Tags:Scalaz scalaz-stream: 深入 了解 -Transducer: Process1-tee-wye

   在上一篇讨论里我们介绍了Source,它的类型款式是这样的:Process[F[_],O]。Source是通过await函数来产生数据流。await函数款式如下:

def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O]  

await函数的作用是:运算F从外界数据源获取数据A,如:从数据库读取记录、从网络读取网页或读取键盘鼠标输入等。获取数据A后输入函数rcv来生成Process[F,O]类型。这是一种产生数据的数据源Source模式。有了数据源Source后我们可能需要对Source提供的数据O进行加工处理,这就是transducer的功能了。我们先看看Transducer的类型款式:

type Process1[-I,+O] = Process[Env[I,Any]#Is, O]

从类型参数来看transducer不会产生任何的副作用,它的作用应该是把接收到的数据元素I加工处理后转换成O类型数据输出。transducer不会主动产生任何数据而是被动接收输入数据I,所以Process1类型的await函数被称为await1,款式如下:

/** The `Process1` which awaits a single input, emits it, then halts normally. */ def await1[I]: Process1[I, I] = receive1(emit) def receive1[I, O](rcv: I => Process1[I, O]): Process1[I, O] =
    await(Get[I])(rcv)

首先可以看出await1就是await函数的特别版本:产生数据的F[A]被替换成了帮助compiler推导类型I的Get[I],也就是说await1不会主动产生数据,它的rcv是个lambda:需要提供给它一个I,它才会返回Process1[I,O]。我们来看看await1的用例:

1  import Process._ 2  def multiplyBy(n: Int): Process1[Int,Int] =
3     await1[Int].flatMap { i => emit(i * n) }.repeat 4                                        //> multiplyBy: (n: Int)scalaz.stream.Process1[Int,Int]
5  def addPosfix: Process1[Int,String] =
6    receive1[Int,String]{ i => emit(i.toString + "!") }.repeat 7                                        //> addPosfix: => scalaz.stream.Process1[Int,String]

可以看出无论await1或者receive1都在被动等待一个元素i来继续进行数据转换功能。我们可以用pipe把Process1连接到一个Source上,然后对Source产生的元素进行转换处理:

1  (range(11,16).toSource pipe multiplyBy(5) |> addPosfix).runLog.run 2                                     //> res0: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)

我们也可以把一个普通函数lift成Process1:

1  import process1._ 2  (range(11,16).toSource |> lift {(i: Int) => i * 5} |> addPosfix).runLog.run 3                                      //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)

上面的|>是pipe符号。实际上我们可以直接对Source输出元素进行转换来达到同样目的:

1  range(11,16).toSource.flatMap { i =>
2   emit(i * 5) }.flatMap { i =>
3   emit(i.toString + "!") }.runLog.run       //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)

虽然用更直接的方法获得相同结果,但值得注意的是现在这个Source已经是一个特殊的版本了,附加在它上面的这些元素转换特性是无法分割的了。实际上pipe就是Process组合函数,我们用它来把Source和Transducer、Transducer与Transducer对接起来。这样我们就可以保证Source和Transducer都是功能单一的函数组件了。

只要连接上一个数据源,我们就可以对它发出的元素进行转换处理。这些transduce功能函数都在process1对象里:

 1 import process1._  2  (range(1,6).toSource pipe take(2))  3  .runLog.run                                      //> res2: Vector[Int] = Vector(1, 2)
 4  (range(1,10).toSource |> filter {_ % 2 == 0 }  5   |> collect {  6     case 4 => "the number four"
 7     case 5 => "the number five"
 8     case 6 => "the number six"
 9     case 100 => "the number one hundred"
10  } 11  ).runLog.run         //> res3: Vector[String] = Vector(the number four, the number six)

基本上所有对scala标准库List使用的函数都可以对Process1施用:

 1 (range(1,6).toSource  2   |> fold(Nil:List[Int]){ (b,a) => a :: b }  3  ).runLog.run                            //> res5: Vector[List[Int]] = Vector(List(5, 4, 3, 2, 1))
 4 (range(1,6).toSource  5   |> foldMap { List(_) }  6  ).runLog.run                            //> res6: Vector[List[Int]] = Vector(List(1, 2, 3, 4, 5))
 7 (range(1,6).toSource  8   |> foldMap { identity }  9  ).runLog.run                            //> res7: Vector[Int] = Vector(15)
10 (range(1,6).toSource 11   |> sum 12  ).runLog.run                            //> res8: Vector[Int] = Vector(15)
13 (range(1,6).toSource 14   |> scan(0){(a,b) => a + b} 15  ).runLog.run                            //> res9: Vector[Int] = Vector(0, 1, 3, 6, 10, 15)

我们也可以把一串现成的元素插入一个

首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scalaz(49)- scalaz-stream: .. 下一篇scala学习手记23 - 函数值

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目