从表面上来看,Stream代表一连串无穷数据元素。一连串的意思是元素有固定的排列顺序,所以对元素的运算也必须按照顺序来:完成了前面的运算再跟着进行下一个元素的运算。这样来看,Stream应该不是很好的并行运算工具。但是,fs2所支持的并行运算方式不是以数据元素而是以?Stream为运算单位的:fs2支持多个Stream同时进行运算,如merge函数。所以fs2使Stream的并行运算成为了可能。
一般来说,我们可能在Stream的几个状态节点要求并行运算:
1、同时运算多个数据源头来产生不排序的数据元素
2、同时对获取的一连串数据元素进行处理,如:map(update),filter等等
3、同时将一连串数据元素无序存入终点(Sink)
我们可以创建一个例子来示范fs2的并行运算:?模拟从3个文件中读取字串,然后统计在这3个文件中母音出现的次数。假设文件读取和母音统计是有任意时间延迟的(latency),我们看看如何进行并行运算及并行运算能有多少效率上的提升。我们先设定一些跟踪和模拟延迟的帮助函数:
1 def log[A](prompt: String): Pipe[Task,A,A] = _.eva lMap { a => Task.delay{ println(s"$prompt>"); a }} 2 //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.eva lMap { a =>
4 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 5 delay.flatMap {d => Task.now(a).schedule(d.millis) } 6 } //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.
log是个跟踪函数,randomDelay是个延迟模拟函数,模拟在max内的任意时间延迟。
与scalaz-stream-0.8不同,fs2重新实现了文件操作功能:不再依赖java的字串(string)处理功能。也不再依赖scodec的二进制数据转换功能。下面是fs2的文件读取方法示范:
1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 2 //> s1 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)
3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 4 //> s2 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)
5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 6 //> s3 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)
fs2.io.file.readAll函数的款式如下:
def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}
readAll分批(by chunks)从文件中读取Byte类型数据(当返回数据量小于chunkSize代表完成读取),返回结果类型是Stream[F,Byte]。我们需要进行Byte>>>String转换及分行等处理。fs2在text对象里提供了相关函数:
object text { private val utf8Charset = Charset.forName("UTF-8") /** Converts UTF-8 encoded byte stream to a stream of `String`. */ def utf8Decode[F[_]]: Pipe[F, Byte, String] = _.chunks.through(utf8DecodeC) /** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */ def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = { /** * Returns the number of continuation bytes if `b` is an ASCII byte or a * leading byte of a multi-byte sequence, and -1 otherwise. */ def continuationBytes(b: Byte): Int = { if ((b & 0x80) == 0x00) 0 // ASCII byte
else if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seq
else if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seq
else if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seq
else -1 // continuation byte or garbage
} ... /** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */ def utf8Encode[F[_]]: Pipe[F, String, Byte] = _.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset)))) /** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */ def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] = _