FunDA的并行数据库读取功能是指在多个线程中同时对多个独立的数据源进行读取。这些独立的数据源可以是在不同服务器上的数据库表,又或者把一个数据库表分成几个独立部分形成的独立数据源。当然,并行读取的最终目的是提高程序的运算效率。在FunDA中具体的实现方式是对多个独立的数据流进行并行读取形成一个统一综合的数据流。我们还是用上次示范所产生的表AQMRPT作为样板数据。在这次示范里我们需要把AQMRPT表中的STATENAME,COUNTYNAME字段抽取出来形成两个独立的表STATE和COUNTY。这两个表结构如下:
case class StateModel(id: Int, name: String) extends FDAROW class StateTable(tag: Tag) extends Table[StateModel](tag,"STATE") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(32)) def * = (id,name)<>(StateModel.tupled,StateModel.unapply) } val StateQuery = TableQuery[StateTable] case class CountyModel(id: Int, name: String) extends FDAROW case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(64)) def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) } val CountyQuery = TableQuery[CountyTable]
首先我们用一些铺垫代码把这两个表结构创建出来:
//assume two distinct db objects
val db_a = Database.forConfig("h2db") //another db object
val db_b = Database.forConfig("h2db") //create STATE table
val actionCreateState = Models.StateQuery.schema.create val futCreateState = db_a.run(actionCreateState).andThen { case Success(_) => println("State Table created successfully!") case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
Await.ready(futCreateState,Duration.Inf) //create COUNTY table
val actionCreateCounty = Models.CountyQuery.schema.create val futCreateCounty = db_a.run(actionCreateCounty).andThen { case Success(_) => println("County Table created successfully!") case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
Await.ready(futCreateCounty,Duration.Inf)
下一步我们把STATENAME从AQMRPT表里抽取出来形成一个数据源(data-source):
//define query for extracting State names from AQMRPT
val qryStates = AQMRPTQuery.map(_.state).distinct.sorted // .distinctOn(r => r)
case class States(name: String) extends FDAROW implicit def toStates(row: String) = States(row) val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _) val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(64,64)()
由于COUNTYNAME比较多,我们可以把AQMRPT表按STATENAME拆成三部分A-K、K-P、P-Z。然后把这三部分构建成三个独立的数据源:
//define query for extracting County names from AQMRPT in separate chunks //query with state name >A and <K
val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" && r.state.toUpperCase < "K")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >K and <P
val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" && r.state.toUpperCase < "P")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >P
val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P") .map(r => (r.state,r.