<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

Akka Clustering and Remoting: The Experiment

Posted by Carl Pulley on Mon, Jun 9, 2014

Introduction

This blog post is based on some concepts that I was using a few years ago to build clusters hosting distributed Actor applications. We will build an example application and scale it into a cloud evironment.

In general, I'll be focused on using Amazon for our cloud deployment. However, I'll also consider what happens if our cloud vendor fails in some way. Whilst such events may be rare, they do occur. So when we scale our Akka application, I will consider how we can avoid reliance upon any single cloud vendor.

In this post, we will be building an sample distributed Akka application. We will see some of the issues this presents and, in future posts, I'll explore why they occured - especially when we scale the application into arbitrary cloud infrastructure.

Application Overview

For our example application, we will implement the following type of ping-pong application:

Ping-Pong-Cluster

Our cluster will host two types of nodes:

  • client nodes
  • and worker nodes
We will provision each of our client and worker nodes with remote actors. Let's for the sake of argument, refer to them as client and worker actors.

The client actor is very simple. When it receives:

  • a Ping message, it will forward that message to a random worker actor
  • a Pong message, it logs that message

The worker actors will be named. For simplicity, let's name them by colour. When a worker actor receives a Ping message, it throws an N-sided die (N > 1):

  • If the die casts the number 1, the worker replies with a Pong message to the sender (i.e. the client actor)
  • If the cast die is not 1, then the worker replies with a Ping to the sender (i.e. the client actor).
As a visual and debugging aid, worker actors append their name to the Ping's message body before replying to the sender.

Technical Details

Clusters are built from collections of nodes. It is on these nodes that an application will perform its computations. So, when we initially build our cluster, each of these nodes will be essentially blank - at least they will be until we launch the remote actors that constitute our application.

Under the hood however, each cluster node communicates with its members. Through these communications, each cluster node is able to discover the state of every other cluster member. At any point in time, new nodes may join the cluster or existing nodes may leave (temporarily or permanently). This is one of the key advantages to using clusters.

The changes in a cluster membership state are communicated using a gossip protocol to all member nodes. The following code demonstrates how we might create an empty cluster and then get the client actor system to join it:

val cluster = Cluster(system)
val joinAddress = cluster.selfAddress cluster.join(joinAddress)

By subscribing the client actor to cluster membership events:

cluster.subscribe(client, classOf[MemberUp])

we can ensure that it provisions new or joining worker nodes, with remote worker actors, as follows:

var addresses = Map.empty[Address, ActorRef]




val client = actor(new Act with ActorLogging {
  become {
    // ...




    case MemberUp(member) if (member.roles.nonEmpty) =>
      // Convention: (head) role is used to label the nodes (single) actor
      val node = member.address
      val act = system.actorOf(Props[WorkerActor].withDeploy(Deploy(scope = RemoteScope(node))), name = member.roles.head)
 
      addresses = addresses + (node -> act)
  }
})

Worker nodes are launched by shelling out using sys.process:

// Here we shell-out to provision out cluster nodes as background processes
def provisionNode(label: String, port: Int): Unit = {
  val AKKA_HOME = "pwd".!!.stripLineEnd + "/target/universal/stage"
  val jarFiles = s"ls ${AKKA_HOME}/lib/".!!.split("\n").map(AKKA_HOME + "/lib/" + _).mkString(":")
  val proc = Process(s"""java -Xms256M -Xmx1024M -XX:+UseParallelGC -classpath "${AKKA_HOME}/config:${jarFiles}" -Dakka.home=${AKKA_HOME} -Dakka.remote.netty.tcp.port=${port} -Dakka.cluster.roles.1=${label} -Dakka.cluster.seed-nodes.1=${joinAddress} akka.kernel.Main cakesolutions.example.WorkerNode""").run




  processes = processes + proc
}

Using the akka-sbt-plugin, we can package all of our jar files into the target/dist directory. In the end game, this will help simplify our ability to scale our application out into the cloud.

Whenever we launch a new cluster node, we specify a seed node address via a system property. This will point to our client node and as a cluster node initialises, it will send a join request to the client node at that seed address. Assuming everything is working as we expect, then this newly joining node may then be recorded as being 'up' by the cluster and it will become a cluster member. Moreover, all this happens for free (i.e. minimal extra coding is required by ourselves here):

class WorkerNode extends Bootable {
  val config = ConfigFactory.load()




  implicit val system = ActorSystem(config.getString("akka.system"))




  val cluster = Cluster(system)




  def startup = {
    // We simply listen for remote actor creation events by default here
  }




  def shutdown = {
    system.shutdown
  }
}

Notice how minimal our WorkerNode class is: it simply defines a cluster node and then listens for remote actor events on a specified port. Again this can be defined using the system properties.

With the cluster infrastructure code in place, let's look at implementing the handling of Ping messages by our worker actors:

<pre "="">WorkerActor extends Actor {   def receive: Receive = {     case Ping(msg, tag) =>       val route = s"$tag-${self.path.name}"       if (Random.nextInt(4) == 1) {        sender ! Pong(s"$route says $msg")       } else {        sender ! Ping(msg, route)       }   } }

and the handling of Ping and Pong messages by our client actor:

val client = actor(new Act with ActorLogging {
  become {
    case msg @ Ping(_, _) if (addresses.nonEmpty) =>
      addresses.values.toList(Random.nextInt(addresses.size)) ! msg
 
    case Pong(msg) =>
       log.info(msg)
 
    // ..
  }
})

And there you have it, a distributed Akka application running across a local cluster.

Should you wish to try out this code, it is available on the experiment-1 branch of my Github repository https://github.com/carlpulley/jclouds-cluster. Simply follow these steps to get everything checked out and built:

# Clone and checkout experiment-1 branch
git clone https://github.com/carlpulley/jclouds-cluster
cd jclouds-cluster
git checkout experiment-1
# Build the application
sbt clean
sbt stage

Conclusions

 

 

Of course, all we've seen so far is code. So, here's a short video demonstrating:

  • the creation of a named (red, green and blue) worker node cluster
  • ping-pong message related traffic in the RGB cluster
  • the provisioning of a yellow worker node
  • ping-pong message related traffic in the RGBY cluster.

Which about wraps up our first clustering experiment. In the next blog article, and with the help of JClouds and Chef, we'll attempt to scale this application into a vendor agnostic cloud infrastructure.

Acknowledgements

Thanks to the High Performance Computing Research Group at the University of Huddersfield who loaned their OpenStack cloud for the initial experiments that this work is based upon. Also special thanks to Anirvan Chakraborty for the blog article that inspired myself to trawl through some old research work.

Need help on your next project?

Posts by Topic

see all

Subscribe to Email Updates