APIs for Akka applications

Let me follow up on yesterday’s post covering web sockets in Akka by showing you how to structure your APIs and, of course, how to provide both HTTP REST API as well as the web socket [push] API. To achieve this, we are going to use Spray, Socko (and thus Netty) and–of course–Akka and Scala.

So, yesterday, we ended up with the following:

  • Main singleton that boots the application
  • WebSocketPushActor that works with Socko & Netty to deal with the push notifications
  • MyActor that supposedly contains some clever business logic
  • WebSocketRegistered, Push and MyMessage case classes that we pass around to the actors
  • A small HTML file with a bit of JavaScript

Yesterday’s code showed the machinery of the web socket application, but it really skimmed over its structure. In fact, I would not recommend building a single ball of actors in your production code. Let’s untangle it now and let’s add the HTTP REST API.

Untangling

First thing we’re going to do is to separate out the application into appropriate modules. We have the actors that represent the business logic. Let’s call that module code. Then we have the actors that deal with the HTTP / web socket communication. Let’s call that api. If you intend to share some of the objects (in the generic sense of the word, I don’t mean singletons here) with other applications, you may want to create a domain module. Finally, we will have the main module that ultimately contains the "public static void main" method.

So, splitting yesterday’s code, we’ll move the MyActor to core; the WebSocketPushActor to api. I also find it useful to include “boot” code in every module–so that there is a clearly defined piece of code that takes care of starting up the module.

Core

To repeat, in the core module, we’ll have the business actors; and those actors have no idea about the APIs that will be built around them!, giving us:

package org.cakesolutions.aws.core

case class Push[A](payload: A)

case class Update(id: Long, value: String)
case class Get(id: Long)

class SomeCoreActor extends Actor {
  def receive = {
    case Update(id, value) =>
       // update the DB, for example
       context.actorOf("/user/api/push") ! Push("updated something!")
    case Get(id) =>
      // load the value from the DB
      val value = compute()
      sender ! value
  }
}

In your system, there will most likely be many other actors that perform similar interesting things. What we need to include is a class that can boot the core:

package org.cakesolutions.aws.core

case class Start()

class CoreActor extends Actor {
  // you will probably want something more clever!
  override val supervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }

  def receive = {
    case Start() =>
      context.actorOf(Props[SomeCoreActor])

      // and other actors
  }
}

class CoreBoot(implicit actorSystem: ActorSystem) {
  actorSystem.actorOf(Props[CoreActor])
}

API

In the API, we’ll have the WebSocketPushActor; and similar booting code. Copy-and-paste from yesterday gives us

package org.cakesolutions.aws.api

case class WebSocketRegistered(channel: Channel)

class WebSocketPushActor extends Actor {
  private val socketConnections = new DefaultChannelGroup()

  protected def receive = {
    case Push(payload) =>
      socketConnections.write(new TextWebSocketFrame(payload.toString()))
    case WebSocketRegistered(channel) =>
      socketConnections.add(channel)
  }
}

The difference form yesterday is that the we will move the startup code to the class that boots the API:

package org.cakesolutions.aws.api

class ApiBoot(implicit actorSystem: ActorSystem) {
  val webSocket = actorSystem.actorOf(Props[WebSocketPushActor],
                                      "webSocketPush")

  val webServer = new WebServer(WebServerConfig(), Routes {
    case WebSocketHandshake(wsHandshake) => wsHandshake match {
      case PathSegments("websocket" :: Nil) => {
        wsHandshake.isAllowed = true
        actorSystem.actorFor("/user/webSocketPush") !
          WebSocketRegistered(wsHandshake.channel)
      }
    }
    case WebSocketFrame(frame) =>
      throw new UnsupportedOperationException(
        "Eh? We're meant to be pushing only!")
  })

  webServer.start()
}

Let’s complete the picture with the main module.

Main

This is the least exciting module in terms of code, but most interesting in terms of the functionality it brings together; not to mention that this is the only module that we can run.

package org.cakesolutions.aws.main

object Main extends App {
  implicit val actorSystem = ActorSystem("example")

  new CoreBoot()
  new ApiBoot()
}

Excellent. We now have the application properly structured. But we’re still missing the HTTP REST API.

HTTP REST API

To complete, we’ll use Spray to add services that convert the data received from the outside world into the form that our core actors understand. This will be really easy task, but what we need to do is to wire up Spray to Socko.

Spray directs the requests using Routes; and it provides the RootService actor as the entry point. The RootService uses one or more HttpService actors that form the actual API. An example of one such service is:

class SomeService(implicit val actorSystem: ActorSystem) 
  extends Directives 
  with DefaultMarshallers {
  
  val route = 
    path("svc1/update" / LongNumber) { id=>
      post {
        completeWith {
          (actorSystem.actorFor("/user/core/some") ! Update(id, "x")
          "updated"
        }
      }
    } ~
    path("svc2/get" / LongNumber) { id=>
      get {
        completeWith {
          (actorSystem.actorFor("/user/core/some") ! Get(id)).
            mapTo[String]
        }
      }
    }
}

So, we need a sequence of Routes. We then map the Routes into HttpServices and give those to the RootService:

class ApiBoot(implicit actorSystem: ActorSystem) {
  val routes =
    new SomeService().route ::
    new SomeOtherService().route ::
    ... ::
    Nil

  val svc: Route => ActorRef = 
    route => actorSystem.actorOf(Props(new HttpService(route)))

  val root = actorSystem.actorOf(Props(
    new RootService(svc(routes.head), routes.tail.map(svc(_)): _*)))

  val webSocket = actorSystem.actorOf(Props[WebSocketPushActor],
                                      "webSocketPush")

  val webServer = new WebServer(WebServerConfig(), Routes {
    case WebSocketHandshake(wsHandshake) => wsHandshake match {
      case PathSegments("websocket" :: Nil) => {
        wsHandshake.isAllowed = true
        actorSystem.actorFor("/user/webSocketPush") !
          WebSocketRegistered(wsHandshake.channel)
      }
    }
    case WebSocketFrame(frame) =>
      throw new UnsupportedOperationException(
        "Eh? We're meant to be pushing only!")
  })

  webServer.start()

}

Finally, we need to allow Socko & Netty to pass on the HTTP requests to Spray. And to do that, all that is required is to turn Socko’s HttpRequestProcessingContext into Spray’s HttpRequest. I shall leave that as an exercise for the curious reader, but the outline is:

class ApiBoot(implicit actorSystem: ActorSystem) {
  val routes =
    new SomeService().route ::
    new SomeOtherService().route ::
    ... ::
    Nil

  val svc: Route => ActorRef = 
    route => actorSystem.actorOf(Props(new HttpService(route)))

  val root = actorSystem.actorOf(Props(
    new RootService(svc(routes.head), routes.tail.map(svc(_)): _*)))

  val webSocket = actorSystem.actorOf(Props[WebSocketPushActor],
                                      "webSocketPush")

  val webServer = new WebServer(WebServerConfig(), Routes {
    case HttpRequest(request) => spray(request)
    case WebSocketHandshake(wsHandshake) => wsHandshake match {
      case PathSegments("websocket" :: Nil) => {
        wsHandshake.isAllowed = true
        actorSystem.actorFor("/user/webSocketPush") !
          WebSocketRegistered(wsHandshake.channel)
      }
    }
    case WebSocketFrame(frame) =>
      throw new UnsupportedOperationException(
        "Eh? We're meant to be pushing only!")
  })

  webServer.start()

  def spray(ctx: HttpRequestProcessingContext) {
    //Forward the request to spray
    val sprayRequest = convertToSprayRequest(ctx) {
      response =>
        if (ctx.is100ContinueExpected) {
          ctx.write100Continue()
        }

        val content = response.content.get
        ctx.writeResponse(content.buffer, 
                          content.contentType.toString(), 
                          Map[String, String]())
    }

    root ! sprayRequest
  }
}

Aha!

Summary

If you managed to keep reading all the way here, you will know how to structure your Akka applications; how to hide the details of the APIs from the core actors; how to use push notifications using web sockets; how to add REST API facade for the core actors.

Happy Hakking!

This entry was posted in Jan's Blog and tagged , , , , . Bookmark the permalink.

6 Responses to APIs for Akka applications

  1. Pingback: Web socket push notifications in Akka, Netty and Socko | Cake Solutions Team Blog

  2. Nathan Stults says:

    Is wiring Spray to Socko just illustrative, or is there a reason to integrate Spray as opposed to just using the Socko API for the REST bits?

  3. Pingback: » Chat?r’s YooH – Nettie adamwest

  4. Jan Machacek says:

    Just illustrative–I know that Socko comes with its own routing, but a lot of people already have a lot of Spray code. Also, Spray deals with marshalling and unmarshalling really well.

    Jan

  5. Pingback: This week in #Scala (18/05/2012) | Cake Solutions Team Blog

  6. Pingback: Hongtium » 2012-05?Scala???

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>