设为首页 加入收藏

TOP

Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing(一)
2017-10-10 12:10:55 】 浏览:8102
Tags:Scalaz scalaz-stream: fs2- 并行 运算 示范 fs2 parallel processing

    从表面上来看,Stream代表一连串无穷数据元素。一连串的意思是元素有固定的排列顺序,所以对元素的运算也必须按照顺序来:完成了前面的运算再跟着进行下一个元素的运算。这样来看,Stream应该不是很好的并行运算工具。但是,fs2所支持的并行运算方式不是以数据元素而是以?Stream为运算单位的:fs2支持多个Stream同时进行运算,如merge函数。所以fs2使Stream的并行运算成为了可能。

一般来说,我们可能在Stream的几个状态节点要求并行运算:

1、同时运算多个数据源头来产生不排序的数据元素

2、同时对获取的一连串数据元素进行处理,如:map(update),filter等等

3、同时将一连串数据元素无序存入终点(Sink)

我们可以创建一个例子来示范fs2的并行运算:?模拟从3个文件中读取字串,然后统计在这3个文件中母音出现的次数。假设文件读取和母音统计是有任意时间延迟的(latency),我们看看如何进行并行运算及并行运算能有多少效率上的提升。我们先设定一些跟踪和模拟延迟的帮助函数:

1 def log[A](prompt: String): Pipe[Task,A,A] = _.eva lMap { a => Task.delay{ println(s"$prompt>"); a }} 2                                                   //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.eva lMap { a =>
4   val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 5   delay.flatMap {d => Task.now(a).schedule(d.millis) } 6 }                                                 //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.

log是个跟踪函数,randomDelay是个延迟模拟函数,模拟在max内的任意时间延迟。

与scalaz-stream-0.8不同,fs2重新实现了文件操作功能:不再依赖java的字串(string)处理功能。也不再依赖scodec的二进制数据转换功能。下面是fs2的文件读取方法示范:

1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 2   //> s1 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)
3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 4   //> s2 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)
5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 6   //> s3 : fs2.Stream[fs2.Task,Byte] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>)

fs2.io.file.readAll函数的款式如下:

def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}

readAll分批(by chunks)从文件中读取Byte类型数据(当返回数据量小于chunkSize代表完成读取),返回结果类型是Stream[F,Byte]。我们需要进行Byte>>>String转换及分行等处理。fs2在text对象里提供了相关函数:

object text { private val utf8Charset = Charset.forName("UTF-8") /** Converts UTF-8 encoded byte stream to a stream of `String`. */ def utf8Decode[F[_]]: Pipe[F, Byte, String] = _.chunks.through(utf8DecodeC) /** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */ def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = { /** * Returns the number of continuation bytes if `b` is an ASCII byte or a * leading byte of a multi-byte sequence, and -1 otherwise. */ def continuationBytes(b: Byte): Int = { if      ((b & 0x80) == 0x00) 0 // ASCII byte
      else if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seq
      else if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seq
      else if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seq
      else                        -1 // continuation byte or garbage
 } ... /** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */ def utf8Encode[F[_]]: Pipe[F, String, Byte] = _.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset)))) /** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */ def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] = _
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记35 - 隐式类型转换 下一篇Scalaz(59)- scalaz-stream: f..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目