在上一篇讨论里我们介绍了Source,它的类型款式是这样的:Process[F[_],O]。Source是通过await函数来产生数据流。await函数款式如下:
def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O]
await函数的作用是:运算F从外界数据源获取数据A,如:从数据库读取记录、从网络读取网页或读取键盘鼠标输入等。获取数据A后输入函数rcv来生成Process[F,O]类型。这是一种产生数据的数据源Source模式。有了数据源Source后我们可能需要对Source提供的数据O进行加工处理,这就是transducer的功能了。我们先看看Transducer的类型款式:
type Process1[-I,+O] = Process[Env[I,Any]#Is, O]
从类型参数来看transducer不会产生任何的副作用,它的作用应该是把接收到的数据元素I加工处理后转换成O类型数据输出。transducer不会主动产生任何数据而是被动接收输入数据I,所以Process1类型的await函数被称为await1,款式如下:
/** The `Process1` which awaits a single input, emits it, then halts normally. */ def await1[I]: Process1[I, I] = receive1(emit) def receive1[I, O](rcv: I => Process1[I, O]): Process1[I, O] =
await(Get[I])(rcv)
首先可以看出await1就是await函数的特别版本:产生数据的F[A]被替换成了帮助compiler推导类型I的Get[I],也就是说await1不会主动产生数据,它的rcv是个lambda:需要提供给它一个I,它才会返回Process1[I,O]。我们来看看await1的用例:
1 import Process._ 2 def multiplyBy(n: Int): Process1[Int,Int] =
3 await1[Int].flatMap { i => emit(i * n) }.repeat 4 //> multiplyBy: (n: Int)scalaz.stream.Process1[Int,Int]
5 def addPosfix: Process1[Int,String] =
6 receive1[Int,String]{ i => emit(i.toString + "!") }.repeat 7 //> addPosfix: => scalaz.stream.Process1[Int,String]
可以看出无论await1或者receive1都在被动等待一个元素i来继续进行数据转换功能。我们可以用pipe把Process1连接到一个Source上,然后对Source产生的元素进行转换处理:
1 (range(11,16).toSource pipe multiplyBy(5) |> addPosfix).runLog.run 2 //> res0: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
我们也可以把一个普通函数lift成Process1:
1 import process1._ 2 (range(11,16).toSource |> lift {(i: Int) => i * 5} |> addPosfix).runLog.run 3 //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
上面的|>是pipe符号。实际上我们可以直接对Source输出元素进行转换来达到同样目的:
1 range(11,16).toSource.flatMap { i =>
2 emit(i * 5) }.flatMap { i =>
3 emit(i.toString + "!") }.runLog.run //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
虽然用更直接的方法获得相同结果,但值得注意的是现在这个Source已经是一个特殊的版本了,附加在它上面的这些元素转换特性是无法分割的了。实际上pipe就是Process组合函数,我们用它来把Source和Transducer、Transducer与Transducer对接起来。这样我们就可以保证Source和Transducer都是功能单一的函数组件了。
只要连接上一个数据源,我们就可以对它发出的元素进行转换处理。这些transduce功能函数都在process1对象里:
1 import process1._ 2 (range(1,6).toSource pipe take(2)) 3 .runLog.run //> res2: Vector[Int] = Vector(1, 2)
4 (range(1,10).toSource |> filter {_ % 2 == 0 } 5 |> collect { 6 case 4 => "the number four"
7 case 5 => "the number five"
8 case 6 => "the number six"
9 case 100 => "the number one hundred"
10 } 11 ).runLog.run //> res3: Vector[String] = Vector(the number four, the number six)
基本上所有对scala标准库List使用的函数都可以对Process1施用:
1 (range(1,6).toSource 2 |> fold(Nil:List[Int]){ (b,a) => a :: b } 3 ).runLog.run //> res5: Vector[List[Int]] = Vector(List(5, 4, 3, 2, 1))
4 (range(1,6).toSource 5 |> foldMap { List(_) } 6 ).runLog.run //> res6: Vector[List[Int]] = Vector(List(1, 2, 3, 4, 5))
7 (range(1,6).toSource 8 |> foldMap { identity } 9 ).runLog.run //> res7: Vector[Int] = Vector(15)
10 (range(1,6).toSource 11 |> sum 12 ).runLog.run //> res8: Vector[Int] = Vector(15)
13 (range(1,6).toSource 14 |> scan(0){(a,b) => a + b} 15 ).runLog.run //> res9: Vector[Int] = Vector(0, 1, 3, 6, 10, 15)
我们也可以把一串现成的元素插入一个