一个完整的scalaz-stream有以下几个部分组成:Source -> Transducer -> Sink,用直白文字来描述就是:“输入 -> 传换 -> 输出”。我们已经在前面几篇讨论中介绍了Source和Transducer的基本情况,这篇讨论我们探讨一下Sink。scalaz-stream最基本的功能就是从Source接收一串元素,经过处理然后输出。毕竟我们从外部获取了数据、处理完毕后总不能就留在原地内存,还是要把数据输出到对当前运算中程序来说的一个外部系统。Sink就等于是这个外部系统的输入接口。与往常一样,我们先来看看Sink的类型款式:
/** * An effectful sink, to which we can send values. Modeled * as a source of effectful functions. */ type Sink[+F[_],-O] = Process[F, O => F[Unit]]
从类型款式看来Sink就是一种Process。只不过它包含的元素是一串函数(O => F[Unit])。这里的O是数据元素,F[Unit]的意思是对O进行F运算后不返回任何结果,如:Task{ println(...)}。我们先构造一个简单的Sink:
1 val sinkStdout: Sink[Task,String] =
2 Process.repeateva l { 3 Task.delay { (s: String) =>
4 Task.delay { println(s) }}} //> sinkStdout : scalaz.stream.Sink[scalaz.concurrent.Task,String] = Append(Await(scalaz.concurrent.Task@702b8b12,<function1>,<function1>),Vector(<function1>))
5
6 val sinkStdout2: Sink[Task,String] =
7 Process.constant { (s: String) =>
8 Task.delay { println(s) }} //> sinkStdout2 : scalaz.stream.Sink[scalaz.concurrent.Task,String] = Append(Emit(Vector(<function1>)),Vector(<function1>))
我们应该怎样把数据传给Sink呢?首先我们可以用tee.zip:
1 (range(1,6) zip sinkStdout).flatMap { 2 case (i,f) => eva l (f(i.toString)) 3 }.run.run //> 1 4 //| 2 5 //| 3 6 //| 4 7 //| 5
实际上scalaz-stream提供了to函数来支持Sink连接。to还是通过tee.zip来实现的:
/** Feed this `Process` through the given effectful `Channel`. */ def through[F2[x]>:F[x],O2](f: Channel[F2,O,O2]): Process[F2,O2] = self.zipWith(f)((o,f) => f(o)).eva l onHalt { _.asHalt } /** Attaches `Sink` to this `Process` */ def to[F2[x]>:F[x]](f: Sink[F2,O]): Process[F2,Unit] = through(f)
我们用to来重复示范上面的例子:
1 (range(1,6).map(_.toString) to sinkStdout).run.run 2 //> 1 3 //| 2 4 //| 3 5 //| 4 6 //| 5
可以说用to表述更简洁。如果我们需要把数据发送到多个外部系统,那我们就必须连接多个Sink了,可以用zip来连接多个Sink:
1 (range(1,6) zip sinkStdout zip sinkStdout2).flatMap { 2 case (((i,f),f2)) => for { 3 _ <- eva l(f(i.toString)) 4 _ <- eva l(f2(i.toString)) 5 } yield () 6 }.run.run //> 1 7 //| 1 8 //| 2 9 //| 2 10 //| 3 11 //| 3 12 //| 4 13 //| 4 14 //| 5 15 //| 5
scalaz-stream提供的observe函数可以像一个分流器一样安插在数据流中间复制一份数据发送到一个Sink而不影响正在流动的数据:
1 (range(1,4).map(_.toString) observe sinkStdout observe sinkStdout2 to sinkStdout) 2 .run.run //> 1 3 //| 1 4 //| 1 5 //| 2 6 //| 2 7 //| 2 8 //| 3 9 //| 3 10 //| 3
以上例子相当于连接了3个Sink。observe通常被用来跟踪流中数据,因为它不会影响数据流的正常运算。我们也可以把多个Sink zip成一个多功能的Sink。与上面例子不同的是它只有一个输出口:
1 import scalaz._ 2 import Scalaz._ 3 import scalaz.stream._ 4 import scalaz.concurrent._ 5 import scala.language.higherKinds 6 object streamLogDemo { 7 sealed trait Loglevel 8 case object Info extends Loglevel 9 case object Debug extends Loglevel 10 case object Warning extends Loglevel 11
12 case class Line(level: Loglevel, line: String) 13 //Sinks
14 val outInfo = io.stdOutLines.contramap {(l: Line) => "Info: " + l.line} 15 //> outInfo : scalaz.stream.Channel[scalaz.concurrent.Task,Line,Unit] = Append(Emit(Vector(<function1>)),Vector(<function1>))
16 val outDebug = io.stdOutLines.contramap {(l: Line) => "Debug: " + l.line} 17 //> outDebug : scalaz.stream.Channel[scalaz.concurrent.Task,Line,Unit] = Append(Emit(Vector(<function1>)),Vector(<function1>))
18 val outWarning = io.stdOutLines.contramap {(l: Li