设为首页 加入收藏

TOP

Akka-CQRS(3)- 再想多点,全面点(一)
2019-08-15 00:11:03 】 浏览:173
Tags:Akka-CQRS 面点

   上篇我介绍了CQRS模式存写部分的具体实现和akka-persistence一些函数和消息的用法。在这篇本来是准备直接用一个具体的例子来示范CQRS模式编程,主要是写端,或者是数据采集端。想着模拟收银机的后端操作,可以使用集群分片(cluster-sharding),每个分片shard代表一部POS机控制系统。在写这段程序之前首先把示例功能实现、cluster-sharding, persistence-actor,actor-passivation, backoff-supervisor, ClusterSharding.start和ClusterSharding.startProxy等技术细节搞清楚:

1、构建几个测试销售的产品信息

2、设计一套简单但功能完整的操作指令command

3、设计运算状态,即一单未结算销售单据的状态。相关的指令-事件command-event转换和状态更新机制

4、单据状态初始化

5、业务逻辑部分,从接到各项指令、指令-事件转换、处理副作用、存写事件、更新单据状态

6、结束单据处理

以一单支付金额大于等于应付金额作为整单结束状态。此时应进行下面的处理:

     1)增加单号  2)清除所有交易项目  3)saveSnapshot  (重启就不用恢复前面的事件persistent-events)

7、资源释放策略及处理 passivation

如果一个shard-entity暂不使用时最好先停掉stop以释放它占用的资源。但用常规的方式停止entity会造成mailbox里未处理的消息丢失,所以cluster-sharding有一套特别的机制ClusterSharding.Passivate(actorRef)来实现shard-entity的安全停用,即:目标entity向ShardRegion发送Passivate(stopMessage)消息、ShardRegion向目标entity发送包嵌在消息里的stopMessage。目标entity在收到消息后可以自行停止。ShardRegion会保留收到Passivate消息到目标entity停止之间收到的消息,还给再启动的entity。在本例子里passivation的应用场景如下:每单支付后如果一段时间没有收到新的开单指令,这个shard-entity可以通过向ShardRegion发送Passivate消息或按空转时间段设定自动passivate自己,这时ShardRegion在entity空转超出时间后自动发送ClusterSharding.start(...)里定义的handOffStopMessage(PoisonPill),如下:

  def passivate(entity: ActorRef, stopMessage: Any): Unit = { idByRef.get(entity) match { case Some(id) ? if (!messageBuffers.contains(id)) { passivating = passivating + entity messageBuffers.add(id) entity ! stopMessage } else { log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity.", entity) } case None ? log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity) } } def passivateIdleEntities(): Unit = { val deadline = System.nanoTime() - settings.passivateIdleEntityAfter.toNanos val refsToPassivate = lastMessageTimestamp.collect { case (entityId, lastMessageTimestamp) if lastMessageTimestamp < deadline ? refById(entityId) } if (refsToPassivate.nonEmpty) { log.debug("Passivating [{}] idle entities", refsToPassivate.size) refsToPassivate.foreach(passivate(_, handOffStopMessage)) } }

启动passivation的时间长度可以通过配置文件或者直接在代码里设置:在配置文件中设置 akka.cluster.sharding.passivate-idle-entity-after = 2m,代表两分钟内没有接收从ShardRegion发来的POS指令即启动passivation(经entity自身actor或actorRef收发的消息不算)。可以设置off关闭自动passivation。其它设置值参考如下:

ns, nano, nanos, nanosecond, nanoseconds
us, micro, micros, microsecond, microseconds
ms, milli, millis, millisecond, milliseconds
s, second, seconds
m, minute, minutes
h, hour, hours
d, day, days

也可以直接在代码里设定ClusterShardingSettings.passivateIdleEntityAfter=2 minutes。不过我们还是选择配置文件方式,比较灵活。下面是一个包括了passivation, backoffSupervisor的示范代码:

import akka.cluster.sharding.ShardRegion.Passivate import scala.concurrent.duration._ object SupervisionSpec { val config = ConfigFactory.parseString( """     akka.actor.provider = "cluster" akka.loglevel = INFO """)

  case class Msg(id: Long, msg: Any) case class Response(self: ActorRef) case object StopMessage val idExtractor: ShardRegion.ExtractEntityId = { case Msg(id, msg) ? (id.toString, msg) } val shardResolver: ShardRegion.ExtractShardId = { case Msg(id, msg) ? (id % 2).toString } class PassivatingActor extends Actor with Act
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2018-11-13 中文代码示例之Progra.. 下一篇2018-12-09 疑似bug_中文代码示例..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目