上节我们探讨了通过scalaz-stream-fs2来驱动一套数据处理流程,用fs2的Pipe类型来实现对数据流的逐行操作。本篇讨论准备在上节讨论的基础上对数据流的流动和元素操作进行优化完善。如数据流动中增加诸如next、skip、eof功能、内容控制中增加对行元素的append、insert、update、remove等操作方法。但是经过一番对fs2的再次解读,发现这些操作模式并不像我所想象那样的方式,实际上用fs2来实现数据行控制可能会更加简单和直接。这是因为与传统数据库行浏览方式不同的是fs2是一种拖式流(pull-model stream),它的数据行集合是一种泛函不可变集合。每一行一旦读取就等于直接消耗了断(consumed),所以只支持一种向前逐行读取模式。如果形象地描述的话,我们习惯的所谓数据集浏览可能是下面这样的场景:
读取一行数据 >>> (使用或更新行字段值)>>> 向下游发送新的一行数据。只有停止发送动作才代表终止运算。完成对上游的所有行数据读取并不代表终止操作,因为我们还可以不断向下游发送自定义产生的数据行。
我们用fs2模拟一套数据流管道FDAPipeLine,管道中间有不定数量的作业节点FDAWorkNode。作业方式包括从管道上游截取一个数据元素、对其进行处理、然后选择是否向下游的管道接口(FDAPipeJoint)发送。下面是这套模拟的类型:fdapipes/package.scala
1 package com.bayakala.funda { 2
3 import fs2._ 4
5 package object fdapipes { 6 //数据行类型
7 trait FDAROW 8
9 //数据处理管道
10 type FDAPipeLine[ROW] = Stream[Task, ROW] 11 //数据作业节点
12 type FDAWorkNode[ROW] = Pipe[Task, ROW, ROW] 13 //数据管道开关阀门,从此处获得管道内数据
14 type FDAValve[ROW] = Handle[Task, ROW] 15 //管道连接器
16 type FDAPipeJoint[ROW] = Pull[Task, ROW, Unit] 17
18 //作业类型
19 type FDATask[ROW] = ROW => Option[List[ROW]] 20
21 } 22
23 }
注意这个FDAROW类型:这是一种泛类型,因为在管道中流动的数据可能有多重类型,如数据行和QueryAction行。
流动控制方法:FDAValves.scala
1 package com.bayakala.funda.fdapipes 2 import fs2._ 3 object FDAValves { //流动控制方法 4 //跳过本行(不向下游发送)
5 def fda_skip[ROW] = Some(List[ROW]()) 6 //将本行发送至下游连接管道
7 def fda_next[ROW](r: ROW) = Some(List[ROW](r)) 8 //终止流动
9 def fda_break = None 10
11 }
数据发送方法:FDAPipes.scala
1 package com.bayakala.funda.fdapipes 2 import fs2._ 3 object FDAJoints { //数据发送方法 4 //write rows down the pipeline
5 def fda_pushRow[ROW](row: ROW) = Pull.output1(row) 6 def fda_pushRows[ROW](rows: List[ROW]) = Pull.output(Chunk.seq(rows)) 7 }
作业节点工作方法:
1 package com.bayakala.funda.fdapipes 2 import FDAJoints._ 3 object FDANodes { //作业节点工作方法
4 def fda_execUserTask[ROW](task: FDATask[ROW]): FDAWorkNode[ROW] = { 5 def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => { 6 h.receive1Option { 7 case Some((r, h)) => task(r) match { 8 case Some(xr) => xr match { 9 case Nil => go(h) 10 case _ => fda_pushRows(xr) >> go(h) 11 } 12 case None => fda_halt 13 } 14 case None => fda_halt 15 } 16 } 17 in => in.pull(go) 18 } 19
20 }
下面我们就示范这个工具库的具体使用方法:examples/Example1.scala
设置示范环境:
1 package com.bayakala.funda.fdapipes.examples 2 import fs2._ 3 import com.bayakala.funda.fdapipes._ 4 import FDANodes._ 5 import FDAValves._ 6 import Helpers._ 7 object Example1 extends App { 8
9
10 case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW 11 // test data set
12 val r1 = Employee(1, "John", 23, 100.00) 13 val r2 = Employee(2, "Peter", 25,100.00) 14 val r3 = Employee(3, "Kay", 35,100.00) 15 val r4 = Employee(4, "Cain", 45,100.00) 16 val r5 = Employee(5, "Catty", 35,100.00) 17 val r6 = Employee(6, "Little", 19,80.00)
注意Employee是一种行类型,因为它extends FDAROW。
我们再写一个跟踪显示当前流动数据行的函数:examples/Helpers.scala
1 package com.bayakala.funda.fdapipes.examples 2 import com.bayakala.funda.fdapipes._ 3 import fs2.Task 4 object Helpers { 5 def log[ROW](prompt: String): FDAWorkNode[ROW] =
6 _.eva lMap {row => Task.delay{ println(s"$prompt> $row"); row }} 7 }
下面我们就用几个有不同要求的例子来示范流动控制和数据处理功能,这些例子就是给最终用户的标准编程示范版本,然后由用户照版编写:
1、根据每条数据状态逐行进行处理: