我们在这篇通过一个具体CQRS-Reader-Actor的例子来示范akka-persistence的query端编程和应用。在前面的博客里我们设计了一个CQRS模式POS机程序的操作动作录入过程,并示范了如何实现CQRS的写端编程。现在我们可以根据这个例子来示范如何通过CQRS的读端reader-actor读取由writer-actor所写入的操作事件并将二进制格式的事件恢复成数据库表的行数据。
首先看看reader是如何从cassandra数据库里按顺序读出写入事件的:cassandra-plugin提供了currentEventsByPersistenceId函数,使用方法如下:
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.currentEventsByPersistenceId("2022", 0, Long.MaxValue)
这个函数返回一个akka-stream的Source[EventEnvelope,_]类型,一个静态Source。还有另外一个eventsByPersistenceId函数可以返回实时动态的akka-stream Source。我们可以runFold一个静态的Source对流元素进行汇总形成一个集合:
// materialize stream, consuming events
val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
我们可以通过模式匹配pattern-matching把List元素分辨出来:
implicit val system = ActorSystem("reader")
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
futureActions.onComplete {
case Success(acts) => acts.reverse.foreach {act => act match {
case LogOned(txn) => println(s"LogOn: $txn" )
case SalesLogged(txn) => println(s"LogSales: $txn")
case _ => println("unkown action !!!!!")
}}
case Failure(exception) => println(exception.getMessage)
}
试着运行一下得到下面的输出:
LogOn: TxnItem(20190509,16:12:23,1001,0,1,6,8,1,0,0,0,,1001,,,,,,,)
LogSales: TxnItem(20190509,16:12:34,1001,0,2,0,0,0,1300,0,0,,005,hainan pineapple,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:35,1001,0,3,0,0,0,300,0,0,,004,demon banana,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:36,1001,0,4,0,0,0,1050,0,0,,002,red grape,02,Grocery,01,,01,Sunkist)
unkown action !!!!!
unkown action !!!!!
如此已经可以将写操作中存入的事件恢复成为Action类结构了。
好了,现在我们稍微认真点做个详细的reader示范。回到我们的POS例子:如果我们调用以下写端指令:
posref ! POSMessage(1022, LogOn("1001", "123"))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", apple.code, 1,820))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", pineapple.code, 2, 1300))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Subtotal(0))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",5))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", banana.code, 1, 300))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", grape.code, 3, 1050))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Subtotal(1))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,true,"001",10))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", orage.code, 10, 350))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",10))
s