<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

Day 4: Akka, Akka Persistence, CQRS/ES

Posted by Jan Machacek on Tue, Dec 16, 2014

Let's drop down from cluster inventories, microservices and their APIs right back to the nitty-gritty of CQRS/ES. Today we're going to take a look at how event-sourcing saved me from a mob of angry users. (Hey, it's my blog, and my artistic license!)

When the users submit their exercise data—the 13bit signed integers packed into a 40bit structure representing the acceleration among the x, y and z axes, it travels through the ExerciseService Spray route, and hits the cluster-sharded UserExercises. The UserExercises is a PersistentActor, which processes the accelerometer data—the commands—and generates events that the UserExercisesView, a PersistentView, replays to provide the session view. Akka takes care of the mechanics of the hosting the instances of the actors on the nodes that include the appropriate role in the cluster, and pushing the events that the UserExercises generates to the UserExercisesView.

lift-actors

Let's go through the most important pieces of code, starting with the ExerciseService.


trait ExerciseService extends Directives with ExerciseMarshallers {
  import akka.pattern.ask
  import com.eigengo.lift.common.Timeouts.defaults._

  def exerciseRoute(userExercises: ActorRef, 
                    userExercisesView: ActorRef, 
                    exerciseClassifiers: ActorRef)
                   (implicit ec: ExecutionContext) =
    ...
    path("exercise" / UserIdValue / SessionIdValue) { (userId, sessionId) ⇒
      get {
        complete {
          (userExercisesView ? UserGetExerciseSession(userId, sessionId)).mapTo[Option[ExerciseSession]]
        }
      } ~
      put {
        handleWith { bits: BitVector ⇒
          (userExercises ? UserExerciseDataProcess(userId, sessionId, bits)).mapRight[Unit]
        }
      } 
      ...
    }

}

These are the two important endpoints for our discussion: the GET /exercise/:userId/:sessionId endpoint goes to the UserExercisesView: the query side; the PUT /exercise/:userId/:sessionId endpoint goes to the UserExercises: the command side. The two sides are connected using Akka Persistence. As you rightly suspect, the view is always playing a catch-up game. In other words, a read operation immediately following a write operation might not return the value of the last write; the system as a whole is not always consistent.

UserExercises

The UserExercises PersistentActor receives the commands (from the API layer), validates them and generates events for the view. To make it easier to reason about the state of the UserExercises instance, I find it helpful to define its behaviour in distinct functions, and to use context.become to switch between the states.


class UserExercises(notification: ActorRef, exerciseClasssifiers: ActorRef)
  extends PersistentActor with ActorLogging {
  import scala.concurrent.duration._

  private val userId = UserId(self.path.name)
  override val persistenceId: String = s"user-exercises-${self.path.name}"

  import context.dispatcher

  private def validateData(result: (BitVector, List[AccelerometerData])): 
    \/[String, AccelerometerData] = ???

  override def receiveRecover: Receive = {
    case SnapshotOffer(_, SessionStartedEvt(sessionId, sessionProps)) ⇒
      context.become(exercising(sessionId, sessionProps))
  }

  private def exercising(id: SessionId, sessionProps: SessionProps): Receive = ???

  private def notExercising: Receive = ???

  override def receiveCommand: Receive = notExercising

}

The UserExercises exists in two states: not exercising and exercising, which are represented by the matching functions.The only thing we can do when not exercising is to begin exercising. (If only life were that simple!)

.

private def notExercising: Receive = withPassivation {
  case ExerciseSessionStart(sessionProps) ⇒
    persist(SessionStartedEvt(SessionId.randomId(), sessionProps)) { evt ⇒
      saveSnapshot(evt)
      sender() ! \/.right(evt.sessionId)
      context.become(exercising(evt.sessionId, sessionProps))
    }
}

The essence is the event that handling the ExerciseSesssionStart generates: the view will now receive the SessionStartedEvt. As a result of that event, the view will update its state, but this state is different and independent of the state that the persistent actor keeps. Finally, I save the snapshot keeping the event that I have generated: this means by the time I reply to the sender(), this actor can be recreated in the right state if it crashes. So, once the mobile app receives a confirmation of a started session, I am certain that as long as my snapshot database does not catastrophically crash, I will be able to recover from failures.

Once inside the exercising state, we need to handle a few more commands: accelerometer data, end session, and another start session. Handling another start session command might be counter-intuitive here, but I found it useful when integrating with the iOS code on unreliable network in the gym in the mill where Cake Solutions offices are.


private def exercising(id: SessionId, sessionProps: SessionProps): Receive = {
  case ExerciseSessionStart(newSessionProps) ⇒
    val newId = SessionId.randomId()
    persist(Seq(SessionEndedEvt(id), SessionStartedEvt(newId, newSessionProps))) { x ⇒
      val (_::newSession) = x
      saveSnapshot(newSession)
      sender() ! \/.right(newId)
      context.become(exercising(newId, newSessionProps))
    }

  case ExerciseDataProcess(`id`, bits) ⇒
    val result = decodeAll(bits, Nil)
    validateData(result).fold(
      { err ⇒ sender() ! \/.left(err)},
      { evt ⇒ exerciseClasssifiers ! Classify(sessionProps, evt); sender() ! \/.right(()) }
    )

  case ExerciseSessionEnd(`id`) ⇒
    persist(SessionEndedEvt(id)) { evt ⇒
      saveSnapshot(evt)
      context.become(notExercising)
      sender() ! \/.right(())
    }

  case FullyClassifiedExercise(metadata, confidence, name, intensity) if confidence > confidenceThreshold ⇒
    persist(ExerciseEvt(id, metadata, Exercise(name, intensity))) { evt ⇒
      ...
    }

  case UnclassifiedExercise(_) ⇒
    // Maybe notify the user?
    tooMuchRestCancellable = Some(context.system.scheduler.scheduleOnce(sessionProps.restDuration, self, TooMuchRest))

  case NoExercise(metadata) ⇒
    persist(NoExerciseEvt(id, metadata)) { evt ⇒
      ...
    }

  case TooMuchRest ⇒
    persist(TooMuchRestEvt(id)) { evt ⇒
      ...
    }

}

Following the code, you can see that I handle the various commands that arrive from the API, use the exerciseClassifiers to perform the exercise classification, receive the results, ..., but throughout, I call persist to persist events that the view can see. Notice also that one command can result in multiple events being generated: an excellent example is the ExerciseSessionStart. It needs to write two events: SessionEndedEvt of the implicitly ended session and the SessionStartedEvt for the session that replaces it.

As the users exercise, the UserExercises generates a sequence of events, for example

  1. SessionStartedEvt
  2. m * ExerciseEvt
  3. NoExerciseEvt
  4. n * ExerciseEvt
  5. NoExerciseEvt
  6. k * ExerciseEvt
  7. TooMuchRestEvt
  8. l * ExerciseEvt
  9. SessionEndedEvt

Looking at this stream of events, one can paint a picture of the exercise session.

  • set 1
    • m * exercise
  • no exercise: stop set 1, start set 2
  • set 2
    • n * exercise
  • no exercise: stop set 2, start set 3
  • set 3
    • k * exercise
  • no exercise (too much rest): stop set 3, start set 4
  • set 4
    • l * exercise

This is exactly the job of the view. And I hope that it is now clear that the persistent actor and the persistent view are fundamentally different: their state models a different situation. The state in the UserExercises helps with processing of the commands. The actor, where necessary provides immediate feedback to the user, typically though some push notificaiton mechanism. The UserExercisesView makes sense of the stream of events that the UserExercises generates so that the user can (a little while after submitting the commands) query it for a state that is somehow computed by making sense of the events.

UserExercisesView

Akka automatically delivers the values passed to the persist calls to the view. All that we have to do is to implement its behaviour. Just like the UserExercises, I find it helpful to define small functions that represent different state of the view, and switch between them using context.become. Looking at the stream of events above, the view has the following states:

  • not exercising
  • exercising
  • in a set

In each state, the view needs to handle the appropriate messages (events), but it also needs to handle events arriving from the API: the queries. And so, we finally arrive at the code:


class UserExercisesView extends PersistentView with ActorLogging {
  import com.eigengo.lift.exercise.UserExercisesView._
  import scala.concurrent.duration._

  // our internal state
  private var exercises = Exercises.empty

  override val viewId: String = s"user-exercises-view-${self.path.name}"
  override val persistenceId: String = s"user-exercises-${self.path.name}"

  private lazy val queries: Receive = {
    case GetExerciseSessionsSummary ⇒
      sender() ! exercises.summary
    case GetExerciseSession(sessionId) ⇒
      sender() ! exercises.get(sessionId)
  }

  private lazy val notExercising: Receive = {
    case SnapshotOffer(_, offeredSnapshot: Exercises) ⇒
      exercises = offeredSnapshot

    case SessionStartedEvt(sessionId, sessionProps) if isPersistent ⇒
      context.become(exercising(ExerciseSession(sessionId, sessionProps, List.empty)).orElse(queries))
  }

  private def inASet(session: ExerciseSession, set: ExerciseSet): Receive = {
    case ExerciseEvt(_, metadata, exercise) if isPersistent ⇒
      context.become(inASet(session, set.withNewExercise(metadata, exercise)).orElse(queries))
    case NoExerciseEvt(_, metadata) if isPersistent ⇒
      context.become(exercising(session.withNewExerciseSet(set)).orElse(queries))
    case TooMuchRestEvt(_) if isPersistent ⇒
      context.become(exercising(session.withNewExerciseSet(set)).orElse(queries))

    case SessionEndedEvt(_) if isPersistent ⇒
      exercises = exercises.withNewSession(session)
      saveSnapshot(exercises)
      context.become(notExercising.orElse(queries))
  }
  
  private def exercising(session: ExerciseSession): Receive = {
    case ExerciseEvt(_, metadata, exercise) if isPersistent ⇒
      context.become(inASet(session, ExerciseSet(metadata, exercise)).orElse(queries))

    case TooMuchRestEvt(_) ⇒

    case SessionEndedEvt(_) if isPersistent ⇒
      saveSnapshot(exercises)
      exercises = exercises.withNewSession(session)
  }

  override def receive: Receive = {
    notExercising.orElse(queries)
  }
}

And this is it in its entirety: the view reconstructs the hierarchy of the exercise sessions, sets and exercises by carefully moving between the states. To save having to re-compute its state all the time, it calls saveSnapshot at appropriate times.

A tiny little bug

For purely didactical purposes, I left a subtle bug in the code. The view loses the last set. The bug is in handling of the SessionEndedEvt in the inASet state.


private def inASet(session: ExerciseSession, set: ExerciseSet): Receive = {
  ...
  
  case SessionEndedEvt(_) if isPersistent ⇒
    // the bug
    exercises = exercises.withNewSession(session)

    // the correct solution:
    // exercises = exercises.withNewSession(session.withNewExerciseSet(set))
    saveSnapshot(exercises)
    context.become(notExercising.orElse(queries))
}

Elementary, my dear Watson, and easily fixed. But we'd really like to somehow apply our fix to all the data that we have already processed. Another elementary, my dear Watson moment! We can simply drop the snapshot, and the view will be sent all the events again, but running our bug-free code. And so, we're able to bring the users' last sets from the nether world!

Summary

Without event-sourcing this—all be it contrived—bug fix would have been difficult to apply to already-processed data; without separating the command and the query sides of the exercise processing, it would be difficult to keep the actors' states clearly defined. So, Akka, Akka Persistence, CQRS and event sourcing #FTW. As usual, the full source code is at https://github.com/eigengo/lift for your cloning and forking pleasure.

Posts by Topic

see all

Subscribe to Email Updates