设为首页 加入收藏

TOP

Akka-CQRS(7)- CQRS Reader Actor 示范(一)
2019-08-15 00:10:53 】 浏览:230
Tags:Akka-CQRS CQRS Reader Actor 示范

   我们在这篇通过一个具体CQRS-Reader-Actor的例子来示范akka-persistence的query端编程和应用。在前面的博客里我们设计了一个CQRS模式POS机程序的操作动作录入过程,并示范了如何实现CQRS的写端编程。现在我们可以根据这个例子来示范如何通过CQRS的读端reader-actor读取由writer-actor所写入的操作事件并将二进制格式的事件恢复成数据库表的行数据。

首先看看reader是如何从cassandra数据库里按顺序读出写入事件的:cassandra-plugin提供了currentEventsByPersistenceId函数,使用方法如下:

  // obtain read journal by plugin id
  val readJournal =
    PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

  // issue query to journal
  val source: Source[EventEnvelope, NotUsed] =
    readJournal.currentEventsByPersistenceId("2022", 0, Long.MaxValue)

这个函数返回一个akka-stream的Source[EventEnvelope,_]类型,一个静态Source。还有另外一个eventsByPersistenceId函数可以返回实时动态的akka-stream Source。我们可以runFold一个静态的Source对流元素进行汇总形成一个集合: 

 

  // materialize stream, consuming events
  val futureActions: Future[List[Any]]  = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

 

我们可以通过模式匹配pattern-matching把List元素分辨出来:

  implicit val system = ActorSystem("reader")
  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  futureActions.onComplete {
      case Success(acts) => acts.reverse.foreach {act => act match {
        case LogOned(txn) => println(s"LogOn: $txn" )
        case SalesLogged(txn) => println(s"LogSales: $txn")
        case _ => println("unkown action !!!!!")
      }}
      case Failure(exception) => println(exception.getMessage)
  }

试着运行一下得到下面的输出: 

LogOn: TxnItem(20190509,16:12:23,1001,0,1,6,8,1,0,0,0,,1001,,,,,,,)
LogSales: TxnItem(20190509,16:12:34,1001,0,2,0,0,0,1300,0,0,,005,hainan pineapple,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:35,1001,0,3,0,0,0,300,0,0,,004,demon banana,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:36,1001,0,4,0,0,0,1050,0,0,,002,red grape,02,Grocery,01,,01,Sunkist)
unkown action !!!!!
unkown action !!!!!

如此已经可以将写操作中存入的事件恢复成为Action类结构了。

好了,现在我们稍微认真点做个详细的reader示范。回到我们的POS例子:如果我们调用以下写端指令:

        posref ! POSMessage(1022, LogOn("1001", "123"))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", apple.code, 1,820))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", pineapple.code, 2, 1300))
        scala.io.StdIn.readLine()


        posref ! POSMessage(1022, Subtotal(0))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",5))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", banana.code, 1, 300))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", grape.code, 3, 1050))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Subtotal(1))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,true,"001",10))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", orage.code, 10, 350))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",10))
        s
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/10/10
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇12. Scala模式匹配 下一篇13. Scala函数式编程(高级部分)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目