overridedefreceiveRecover: Receive = { case event: LuckyEvent => updateState(event) //恢复Actor时根据持久化的事件恢复Actor状态 caseSnapshotOffer(_, snapshot: Lottery) => log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}") state = snapshot //利用快照恢复Actor的状态 caseRecoveryCompleted => log.info("the actor recover completed") }
defupdateState(le: LuckyEvent) = state = state.update(le.luckyMoney) //更新自身状态
overridedefreceiveCommand: Receive = { case lc: LotteryCmd => doLottery(lc) match { //进行抽奖,并得到抽奖结果,根据结果做出不同的处理 case le: LuckyEvent => //抽到随机红包 persist(le) { event => updateState(event) increaseEvtCountAndSnapshot() sender() ! event } case fe: FailureEvent => //红包已经抽完 sender() ! fe } case"saveSnapshot" => // 接收存储快照命令执行存储快照操作 saveSnapshot(state) caseSaveSnapshotSuccess(metadata) => ??? //你可以在快照存储成功后做一些操作,比如删除之前的快照等 }
objectPersistenceTestextendsApp{ val lottery = Lottery(10000,10000) val system = ActorSystem("example-05") val lotteryActor = system.actorOf(Props(newLotteryActor(lottery)), "LotteryActor-1") //创建抽奖Actor val pool: ExecutorService = Executors.newFixedThreadPool(10) val r = (1 to 100).map(i => newLotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","xx@gmail.com")) //创建100个抽奖请求 ) r.map(pool.execute(_)) //使用线程池来发起抽奖请求,模拟同时多人参加 Thread.sleep(5000) pool.shutdown() system.terminate() }
classLotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extendsRunnable{ //抽奖请求 implicitval timeout = Timeout(3.seconds) defrun: Unit = { for { fut <- lotteryActor ? lotteryCmd } yield fut match { //根据不同事件显示不同的抽奖结果 case le: LuckyEvent => println(s"恭喜用户${le.userId}抽到了${le.luckyMoney}元红包") case fe: FailureEvent => println(fe.reason) case _ => println("系统错误,请重新抽取") } } }
# DONOTUSETHISINPRODUCTION !!! # See also https://github.com/typesafehub/activator/issues/287 akka.persistence.journal.leveldb.native = false//因为我们本地并没有安装leveldb,所以这个属性置为false,但是生产环境并不推荐使用
overridedefreceiveRecover: Receive = { case event: LuckyEvent => updateState(event) //恢复Actor时根据持久化的事件恢复Actor状态 caseSnapshotOffer(_, snapshot: Lottery) => log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}") state = snapshot //利用快照恢复Actor的状态 caseRecoveryCompleted => log.info("the actor recover completed") }
defupdateState(le: LuckyEvent) = state = state.update(le.luckyMoney) //更新自身状态
var lotteryQueue : ArrayBuffer[(LotteryCmd, ActorRef)] = ArrayBuffer()