<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

To close our discussion about message formats, let's explore a scenario where we say that all messages between our micoservices need to be defined in the *.proto files, and where we generate the appropriate case classes and marshallers at build-time. This works really well if we want to just use Protocol Buffers: proto definitions to ScalaPB sbt build-time code generator to microservices using Protocol Buffers as wire format. But what if we want to use the same message definitions for JSON?

JSON and Akka HTTP

Luckily, ScalaPB already includes support for using the message definitions and producing JSON. All that we have to do to take advantage of this in Akka HTTP is to implement the appropriate FromEntityUnmarshaller[A] and ToEntityMarshaller[A].

trait ScalaPBMarshalling {
  private val protobufContentType = ContentType(MediaType.applicationBinary("octet-stream", Compressible, "proto"))
  private val applicationJsonContentType = ContentTypes.`application/json`

  def scalaPBFromRequestUnmarshaller[O <: GeneratedMessage with Message[O]](companion: GeneratedMessageCompanion[O]): FromEntityUnmarshaller[O] = {
    Unmarshaller.withMaterializer[HttpEntity, O](_ ⇒ implicit mat ⇒ {
      case entity@HttpEntity.Strict(`applicationJsonContentType`, data) ⇒
        val charBuffer = Unmarshaller.bestUnmarshallingCharsetFor(entity)
      case entity@HttpEntity.Strict(`protobufContentType`, data) ⇒
      case entity ⇒
        Future.failed(UnsupportedContentTypeException(applicationJsonContentType, protobufContentType))

  implicit def scalaPBToEntityMarshaller[U <: GeneratedMessage]: ToEntityMarshaller[U] = {
    def jsonMarshaller(): ToEntityMarshaller[U] = {
      val contentType = applicationJsonContentType
      Marshaller.withFixedContentType(contentType) { value ⇒
        HttpEntity(contentType, JsonFormat.toJsonString(value))

    def protobufMarshaller(): ToEntityMarshaller[U] = {
      Marshaller.withFixedContentType(protobufContentType) { value ⇒
        HttpEntity(protobufContentType, value.toByteArray)

    Marshaller.oneOf(jsonMarshaller(), protobufMarshaller())


We can now go ahead and use the marshallers on any generated code:

class PropertyTest extends FlatSpec with ProtobufMatchers with PropertyChecks with ScalaPBMarshalling {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  "Marshalling" should "work" in {
    val value = v200.Caption(
      text = "Hello,",
      ints = Seq(1, 2, 3),
      items = Seq(...),
      corpus = v200.Caption.Corpus.UNIVERSAL)

    implicit val marshaller = implicitly[ToEntityMarshaller[A]]
    implicit val unmarshaller = scalaPBFromRequestUnmarshaller(v200.Caption)

    val contentTypes = List(ContentTypes.`application/json`, ContentTypes.`application/octet-stream`)
    val futureResults = contentTypes.map { contentType ⇒
        .flatMap {
          _.flatMap {
            case Marshalling.WithFixedContentType(`contentType`, m) ⇒ Some(m())
            case _ ⇒ None
          } match {
            case h :: Nil ⇒ Future.successful(h)
            case _ ⇒ Future.failed(new RuntimeException(":("))
        .flatMap(entity ⇒ unmarshaller.apply(entity))

    Await.result(Future.sequence(futureResults), Duration.Inf)
      .foreach(_ should equal(value))

Notice that implicit val marshaller can be looked up implicitly using implicitly[ToEntityMarshaller[A]], which will invoke the implicit def scalaPBToEntityMarshaller[A <: GeneratedMessage]: ToEntityMarshaller[A] in ScalaPBMarshalling. The unmarshaller cannot be obtained implicitly, we need to construct it by invoking the ScalaPBMarshalling.scalaPBFromRequestUnmarshaller, supplying the GeneratedMessageCompanion. Nevertheless, the rest of the code is an exercise in Akka HTTP; we end up with sequence of Future[v200.Caption]s, which are the result of marshalling and unmarshalling the original value, and so they all should be equal to each other.

Code quality

This is a great start, but it leaves space for anyone to hand-roll their own marshaller or unmarshaller. This effectively breaks our rule that messages are defined in their *.proto files. The hand-rolled marshaller (or unmarshaller) can do anything; it can even be used to sneakily introduce a message that has no protocol definition.

class PropertyTest extends FlatSpec with ProtobufMatchers with PropertyChecks with ScalaPBMarshalling {

  "Hand-rolled marshallers" should "be rejected" in {
    implicit def hrm: ToEntityMarshaller[v100.Caption] = ???

    implicit val hru: Unmarshaller[HttpEntity, v200.Caption] = Unmarshaller.withMaterializer[HttpEntity, v200.Caption](_ ⇒ implicit mat ⇒ {
      _ ⇒ Future.failed(UnsupportedContentTypeException())



The hrm and hru (hand-rolled marshaller and hand-rolled unmarshaller) should be compile errors. To do this, we'll write a compiler plugin that rejects implicit values of type Marshaller[_, _] and Unmarshaller[_, _]. Nevertheless, we need to allow the two special cases in ScalaPBMarshalling. We will do so by defining a StaticAnnotation that is in private scope to ScalaPBMarshalling, and then ignoring those symbols in our compiler plugin.

trait ScalaPBMarshalling {

  def scalaPBFromRequestUnmarshaller[O <: GeneratedMessage with Message[O]](companion: GeneratedMessageCompanion[O]): FromEntityUnmarshaller[O] = ...

  implicit def scalaPBToEntityMarshaller[U <: GeneratedMessage]: ToEntityMarshaller[U] = ...

object ScalaPBMarshalling extends ScalaPBMarshalling {
  private class permit extends StaticAnnotation

The plugin

The plugin will introduce the marshaller-linter phase after typer, but before patmat phases; and it will examine all trees in the compilation unit for the values we want to reject, unless they have the permit annotation.

class MarshallerLinterPlugin(val global: Global) extends Plugin {
  plugin ⇒

  override val name: String = "Marshaller Linter"
  override val description: String = "Verifies the coding standards of marshalling code"
  override val components: List[PluginComponent] = List(component)

  private object component extends PluginComponent {
    override val global: Global = plugin.global
    override val phaseName: String = "marshaller-linter"
    override val runsBefore = List("patmat")
    override val runsAfter: List[String] = List("typer")

    import global._

    override def newPhase(prev: Phase): Phase = new StdPhase(prev) {

      override def apply(unit: CompilationUnit): Unit = {
        // the permit
        val permitAnnotationType = rootMirror.getClassIfDefined("org.eigengo.rsa.ScalaPBMarshalling.permit").tpe
        // types we will reject for defs and vals
        val rejectedRhsTypes = List("akka.http.scaladsl.marshalling.Marshaller", "akka.http.scaladsl.unmarshalling.Unmarshaller")
          .map(name ⇒ rootMirror.getClassIfDefined(name).tpe.erasure)

        // Expands all child trees of ``tree``, returning flattened iterator of trees.
        def allTrees(tree: Tree): Iterator[Tree] =
          Iterator(tree, analyzer.macroExpandee(tree)).filter(_ != EmptyTree)
            .flatMap(t ⇒ Iterator(t) ++ t.children.iterator.flatMap(allTrees))

        // checks that the permit annotation is present on the given ``symbol``.
        def hasPermitAnnotation(symbol: global.Symbol): Boolean = {
          Option(symbol).forall(_.annotations.exists(_.tpe <:< permitAnnotationType))

        type Rejection = String

        // checks the tree for disallowed type
        def rejectHandRolled(tree: Tree): Option[Rejection] = {
          if (tree.tpe <:< definitions.NullTpe) None
          else rejectedRhsTypes.find(rejectedType ⇒ tree.tpe.dealiasWiden.erasure <:< rejectedType).map(_.toString())

        // check all expanded trees of each compilation unit
        allTrees(unit.body).foreach {
          case d@ValDef(mods, _, _, rhs) if !hasPermitAnnotation(rhs.symbol) ⇒
            rejectHandRolled(rhs).foreach { rejection ⇒
              global.globalError(d.pos, s"Cannot hand-roll val of type $rejection.")
          case d@DefDef(mods, _, _, _, tpt, rhs) if mods.isImplicit && !hasPermitAnnotation(d.symbol) ⇒
            rejectHandRolled(rhs).orElse(rejectHandRolled(tpt)).foreach { rejection ⇒
              global.globalError(d.pos, s"Cannot hand-roll implicit def returning $rejection.")
          case _ ⇒ // noop


The only thing that remains is to wire in our linter to our build.sbt definition. Because our linter is part of the project, we can't use addCompilerPlugin; instead, we have to specify the location of the plugin's classpath directory "by hand" as the -Xplugin: scalac parameter.

lazy val protocol = project.in(file("protocol"))

lazy val protobufTestkit = project.in(file("protobuf-testkit")) 

lazy val linterPlugin = project.in(file("linter-plugin"))
    libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value

lazy val ingest = project.in(file("ingest"))
  .dependsOn(protocol % PB.protobufConfig.name)
  .dependsOn(protobufTestkit % Test)
  .dependsOn(linterPlugin % Compile)


lazy val commonSettings = Seq(
  organization := "org.eigengo",
  scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked"),
  resolvers += "Maven central" at "http://repo1.maven.org/maven2/",
  autoCompilerPlugins := true

lazy val linterSettings: Seq[Setting[_]] = Seq(
  scalacOptions += "-Xplugin:" + ((classDirectory in linterPlugin) in Compile).value

Once this is wired in, we get compile errors when we attempt to hand-roll own marshallers and unmarshallers in sbt, but also in IntelliJ IDEA.


Writing a compiler plugin might seem an overkill, but it is (1) fun thing to do and (2) it helps maintain the overall quality of your projects automatically. In large projects, this becomes absolutely essential. Again, the code (very much WIP) is at https://github.com/eigengo/reactive-summit-2016.

Recent Posts

Posts by Topic

see all

Subscribe to Email Updates