<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

Tweet image analysis talks I

Posted by Jan Machacek on Mon, Nov 28, 2016

Twitter computer vision talks I

Starting at the Reactive Summit in Austin (Cake Solutions is returning next year, naturally!), I talked about a microservices-based system that makes the most of Akka and Lagom, exposes the protocols that represent messages on Kafka topics and messages in the Akka Persistence Cassandra-based journal. Moreover, I focused on various [microservice] messaging patterns in Kafka, highlighting the approaches to various message delivery semantics. I wrapped the entire system with as much build-time tooling as possible, paving a way to allow large teams to start contributing to the project with as much automated support as possible. In this post, I will summarise the most important points of the codebase and the talks.

The architecture

Before I jump into the code, I will show the system's main components. The microservices deal with ingestion; scene recognition, face recognition, and OCR; finally a dashboard showing the summary of the data extracted from the images in the ingested tweets.

APIs and formats

When implementing a microservices-based system, it is important to clearly specify the services' APIs. In this blog post, I will describe the way to clearly describe the protocols, and to use this description to generate the microservice-specific code (in this particular example, the case classes and the Protocol Buffers serialisation). It is important to keep the microservices completely separated from each other; in this example, this means no common code. It is tempting to have a module (think JAR) that defines the protocols as case classes, and to share this module between the microservices. This is the first step on the road to ruin; the next step usually is to include the marshallers. "It makes sense to have code that marshals and unmarshals these case classes in a single module; everyone who uses this is probably going to do some marshalling, anyway." This approach allows the contextual boundary between the microservices to leak; a practical manifestation is that the microservices that are bound together with this shared code become locked together because of the transient dependencies.

Consider a scenario with a protocol module, which depends on, say, "io.spray" %% "spray-json" % "1.3.2" is then used by two microservices that need to exchange values defined in this protocol. Now both microservices are bound by the transient dependency; it is difficult to separate them. As the protocol common code grows, the tangle of dependencies grows. Eventually, one ends up with microservices that have the complexity of a microservice system, but behave as a monolith.

Furthermore, the shared code and marshallers approach makes it very difficult to generate documentation for the protocols, never mind keeping this documentation in sync with the codebase. In typical Scala projects, the documentation for the wire formats would have to be dug from the case classes and automatically derived marshallers, and hand-rolled marshallers. Finally, this approach makes it cumbersome to write code that ensures that different versions of the messages are compatible with each other: when using semantic versioning, bugfix and minor versions are expected to be forward and backward compatible with each other within the same major version.

Protocol buffers, SBT and ScalaTest

To address these problems, I have used ScalaPB, together with some "interesting" sbt code, and addition to the ScalaTest matchers that ensure compatibility. The end result is a module (JAR) that contains only protocol definitions, the microservices can depend on this module and have the case classes and Protocol Buffers marshallers generated at compile time. Finally, the ProtobufMatchers mixin contains Matcher that can be used to ensure compatiblity between messages.

Let me start with the sbt code. It defines two modules: protocol and ingest, where protocol contains only the Protocol Buffers definitions, and ingest needs to have the matching scala code generated at compile time.

Exploring the structure of the protocol module, we have the src/main/resources directory with just the *.proto files.


protocol
  src
    main
      resources
        org.eigengo.rsa.text
          v100.proto
          v101.proto

The v100.proto and v101.proto files define the ingest micrservice protocol in two versions (1.0.0 and 1.0.1).


// v100.proto
syntax = "proto3";

package org.eigengo.rsa.text.v100;

message Text {
    repeated string areas = 1;
}

// v101.proto
syntax = "proto3";

package org.eigengo.rsa.text.v101;

message Text {
    repeated string areas = 1;
    double overallAccuracy = 2;
}

In the sbt code, we now need to generate the matching Scala code and the Protocol Buffers marshallers: we also need to generate these at compile-time, without any dependency on the protocol module at runtime. So, it is time to write some funky sbt code:


import sbt.Keys._
import com.trueaccord.scalapb.{ScalaPbPlugin => PB}

scalaVersion in ThisBuild := "2.11.8"

lazy val protocol = project.in(file("protocol"))
  .settings(commonSettings)

lazy val ingest = project.in(file("ingest"))
  .dependsOn(protocol % PB.protobufConfig.name)
  .dependsOn(protobufTestkit % Test)
  .settings(commonSettings)
  .settings(dockerSettings)
  .settings(serverSettings)
  .settings(protobufSettings(protocol))

lazy val commonSettings = Seq(
  organization := "org.eigengo",
  scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked")
)

def protobufSettings(protocol: Project) = PB.protobufSettings ++ Seq(
  version in PB.protobufConfig := "3.0.0",
  PB.runProtoc in PB.protobufConfig := (args => com.github.os72.protocjar.Protoc.runProtoc("-v300" +: args.toArray)),
  javaSource in PB.protobufConfig <<= (sourceDirectory in Compile)(_ / "generated"),
  scalaSource in PB.protobufConfig <<= (sourceDirectory in Compile)(_ / "generated"),
  PB.flatPackage in PB.protobufConfig := true,
  PB.externalIncludePath in PB.protobufConfig := ((classDirectory in protocol) in Compile).value,
  sourceDirectories in PB.protobufConfig <+= PB.externalIncludePath in PB.protobufConfig
)

Now, this means that we can run sbt package and have the Scala case classes generated into the ingest module. With the generated code (and with the ProtobufMatchers mixin) we can now write test that verifies that the 1.0.0 and 1.0.1 messages are compatible. The compatiblity check is done by marshalling the "left" value to the wire format and then using the "right"'s unmarshaller to unpack it from the wire format, then check the intersecting properties for their values.


class PropertyTest extends FlatSpec with ProtobufMatchers {

  "Generated protocol code" should "be semver compatible" in {
    v100.Text(Seq("a")) should be (compatibleWith(v101.Text))
    v101.Text(Seq("a"), 1) should be (compatibleWith(v100.Text))
  }

}

To close our discussion about message formats, let's explore a scenario where we say that all messages between our micoservices need to be defined in the *.proto files, and where we generate the appropriate case classes and marshallers at build-time. This works really well if we want to just use Protocol Buffers: proto definitions to ScalaPB sbt build-time code generator to microservices using Protocol Buffers as wire format. But what if we want to use the same message definitions for JSON?

JSON and Akka HTTP

Luckily, ScalaPB already includes support for using the message definitions and producing JSON. All that we have to do to take advantage of this in Akka HTTP is to implement the appropriate FromEntityUnmarshaller[A] and ToEntityMarshaller[A].


trait ScalaPBMarshalling {
  private val protobufContentType = ContentType(MediaType.applicationBinary("octet-stream", Compressible, "proto"))
  private val applicationJsonContentType = ContentTypes.`application/json`

  def scalaPBFromRequestUnmarshaller[O <: GeneratedMessage with Message[O]](companion: GeneratedMessageCompanion[O]): FromEntityUnmarshaller[O] = {
    Unmarshaller.withMaterializer[HttpEntity, O](_ ⇒ implicit mat ⇒ {
      case entity@HttpEntity.Strict(`applicationJsonContentType`, data) ⇒
        val charBuffer = Unmarshaller.bestUnmarshallingCharsetFor(entity)
        FastFuture.successful(JsonFormat.fromJsonString(data.decodeString(charBuffer.nioCharset().name()))(companion))
      case entity@HttpEntity.Strict(`protobufContentType`, data) ⇒
        FastFuture.successful(companion.parseFrom(CodedInputStream.newInstance(data.asByteBuffer)))
      case entity ⇒
        Future.failed(UnsupportedContentTypeException(applicationJsonContentType, protobufContentType))
    })
  }

  implicit def scalaPBToEntityMarshaller[U <: GeneratedMessage]: ToEntityMarshaller[U] = {
    def jsonMarshaller(): ToEntityMarshaller[U] = {
      val contentType = applicationJsonContentType
      Marshaller.withFixedContentType(contentType) { value ⇒
        HttpEntity(contentType, JsonFormat.toJsonString(value))
      }
    }

    def protobufMarshaller(): ToEntityMarshaller[U] = {
      Marshaller.withFixedContentType(protobufContentType) { value ⇒
        HttpEntity(protobufContentType, value.toByteArray)
      }
    }

    Marshaller.oneOf(jsonMarshaller(), protobufMarshaller())
  }

}

We can now go ahead and use the marshallers on any generated code:


class PropertyTest extends FlatSpec with ProtobufMatchers with PropertyChecks with ScalaPBMarshalling {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  "Marshalling" should "work" in {
    val value = v100.Text(Seq("a"))

    implicit val marshaller = implicitly[ToEntityMarshaller[A]]
    implicit val unmarshaller = scalaPBFromRequestUnmarshaller(v100.Text)

    val contentTypes = List(ContentTypes.`application/json`, ContentTypes.`application/octet-stream`)
    val futureResults = contentTypes.map { contentType ⇒
      marshaller.apply(value)
        .flatMap {
          _.flatMap {
            case Marshalling.WithFixedContentType(`contentType`, m) ⇒ Some(m())
            case _ ⇒ None
          } match {
            case h :: Nil ⇒ Future.successful(h)
            case _ ⇒ Future.failed(new RuntimeException(":("))
          }
        }
        .flatMap(entity ⇒ unmarshaller.apply(entity))
    }

    Await.result(Future.sequence(futureResults), Duration.Inf)
      .foreach(_ should equal(value))
  }

Notice that implicit val marshaller can be looked up implicitly using implicitly[ToEntityMarshaller[A]], which will invoke the implicit def scalaPBToEntityMarshaller[A <: GeneratedMessage]: ToEntityMarshaller[A] in ScalaPBMarshalling. The unmarshaller cannot be obtained implicitly, we need to construct it by invoking the ScalaPBMarshalling.scalaPBFromRequestUnmarshaller, supplying the GeneratedMessageCompanion. Nevertheless, the rest of the code is an exercise in Akka HTTP; we end up with sequence of Future[v200.Caption]s, which are the result of marshalling and unmarshalling the original value, and so they all should be equal to each other.

We can go as far as to write a compiler plugin that ensures that we don't hand-roll our own marshallers. This ensures that no developer on the project can accidentally miss the focus on strong protocol definitions. Once this is wired in, we get compile errors when we attempt to hand-roll own marshallers and unmarshallers in sbt, but also in IntelliJ IDEA.

Envelopes

Now that we have a clear definition of the protocol to be used for the messages that our system consumes and produces (both eternally and internally), let's take some time to think about some common fields in every message. These common fields should allow us to correlate messages together, to de-duplicate messages under the at-least-once delivery semantics, and to allow us to implement message tracing. It is possible to add these common fields to every protocol, but it is often better to define the messages as envelope with the common fields and a payload that represents the actual message.


syntax = "proto3";

package org.eigengo.rsa;

message Envelope {
    int32 version = 1;
    int64 processingTimestamp = 2;
    int64 ingestionTimestamp = 3;

    string handle = 5;
    string correlationId = 6;
    string messageId = 7;

    int32 messageVersion = 8;
    string messageType = 9;
    bytes payload = 10;
}

I have grouped the fields in the Envelope protocol definitions. The first group defines the version of the envelope itself, and the (node-local) first ingestion and local processing timestamps. The second group defines the identity of the message. Since we have a Twitter app, we can include the handlecode>, and then ingestion-generated UUIDs for correlationId and messageId. Finally, the third group contains details about the actual payload. To successfully deserialise it, we must know its version, type and the bytes that make up the payload.

This structure allows us to implement microservices that only process the envelope information, without having to (or being allowed to) process the payload.

Akka Journal

Now that we have the protocol definitions for our APIs, it is important to ensure that we keep the well-defined protocols in our Akka Journal. To do so, all we have to do is to configure Akka Persistence's serializers.


akka.actor {

  serializers {
    spb = "org.eigengo.rsa.serialization.ScalaPBSerializer"
  }

  serialization-bindings {
    "com.trueaccord.scalapb.GeneratedMessage" = spb
  }

}

The ScalaPBSerializer simply delegates to the underlying Protobuf & ScalaPB machinery:


class ScalaPBSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
  
  // https://en.wikipedia.org/wiki/Face_of_Boe
  override val identifier: Int = 0xface0fb0

  override def manifest(o: AnyRef): String = o.getClass.getCanonicalName

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case g: GeneratedMessage ⇒ g.toByteArray
    case _ ⇒ throw new IllegalArgumentException(s"$o is not an instance of ScalaPB-generated class.")
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
    system.dynamicAccess.getObjectFor[GeneratedMessageCompanion[_ <: GeneratedMessage with Message[_]]](manifest)
      .flatMap(_.validate(bytes))
      .get

}

With this in place, we can safely call PersistentActor.persist family of functions; when we supply an instance of ScalaPB-generated code, it will end up serialized in the journal as proper binary Protobuf payload.

The Cake Kafka client

Just as importantly, we can use the same generated code in our Kafka code when consuming or producing messages. To give you a worked example, let's explore the SceneClassifierActor.


object SceneClassifierActor {
  private val extractor = ConsumerRecords.extractor[String, Envelope]

  def props(config: Config): Props = {
    val Success(sceneClassifier) = SceneClassifier(...)
    val consumerConf = KafkaConsumer.Conf(
      config.getConfig("kafka.consumer-config"),
      keyDeserializer = new StringDeserializer,
      valueDeserializer = KafkaDeserializer(Envelope.parseFrom)
    )
    val consumerActorConf = KafkaConsumerActor.Conf()
    val producerConf = KafkaProducer.Conf(
      config.getConfig("kafka.scene-producer"),
      new StringSerializer,
      KafkaSerializer[Envelope](_.toByteArray)
    )

    Props(classOf[SceneClassifierActor], consumerConf, consumerActorConf, producerConf, sceneClassifier)
  }

}

Here we have constructed th configuration for the KafkaConsumer, KafkaConsumerActor, and the KafkaProducer. The key portion is the valueDeserializer in KafkaConsumer.Conf and the third parameter in KafkaProducer.Conf, repsetively. KafkaDeserializer applies the given function to the received bytes, and KafkaSerializer does the reverse. Both fit very nicely to the structure of the ScalaPB-generated code. The usage is as simple as you would expect:


class SceneClassifierActor(
  consumerConf: KafkaConsumer.Conf[String, Envelope], 
  consumerActorConf: KafkaConsumerActor.Conf,
  producerConf: KafkaProducer.Conf[String, Envelope],
  sceneClassifier: SceneClassifier) extends Actor {

  import SceneClassifierActor._

  private[this] val kafkaConsumerActor = context.actorOf(...)
  private[this] val producer = KafkaProducer(conf = producerConf)

  override def receive: Receive = {
    case extractor(consumerRecords) ⇒
      // consumerRecords.pairs is Map[String, Envelope]
      consumerRecords.pairs.foreach {
        case (_, envelope) ⇒
          Envelope out = ???

          // producer expects KafkaProducerRecord[String, Envelope]
          producer.send(KafkaProducerRecord("scene", envelope.handle, out))
      }

      sender() ! Confirm(consumerRecords.offsets, commit = true)
  }

}

This code, while it shows the essence of the serializer and deserializer code, can lose messages. Here, producer.send reutrns Future[RecordMetadata], which we promptly ignore; hence, we can confirm the offsets to Kafka even if we have failed to produce the outgoing message—something we'd really like to avoid.

Delivery semantics

Now that we know how to interpret the bytes that come, let's deal with the semantics of message processing. In particular, let's explore the at-least-once delivery semantics. Essentially, we want to make sure that we do not lose a message by consuming it (in Kafka-speak, confirming the offsets in automatically managed partitions) before we have completed processing of the messsage. Remember that processing might mean some other asynchronous I/O; for example producing a message to another Kafka topic.

To really understand what is happening—and why the SceneClassifierActor.receive handler above received multiple consumerRecords in one consumption "tick", let's take a look at how Kafka works.

In this simple animation, we have three partitions and it just so happens that the blue square and green circle's key falls into the second partition. The consumer therefore receives two consumerRecords. The consumer then processes the received records and when it is done, it confirms the offsets (you can actually think of the offsets as just a simple integer) into the records in that partition. It does not confirm individual messages. If the consumer does not confirm the offsets within the configured timeout, Kafka will remove the consumer (it is probably dead or dying), which will cause a re-balance of the partition. Ultimately, a new available consumer will be selected to receive the (unconfirmed) records.

Let's now tackle the at-least-once delivery pattern with some examples.

Kafka manages offsets, clients confirm on completion

This is the simplest model to implement, as long as the consumer can complete its processing and confirm the offsets well within Kafka's confirmation timeout. It relies on Kafka to manage the offsets for each of the consumer; its essence is the following code.


client ! Subscribe(self)

def receive: Receive = {
  case extractor(consumerRecords) ⇒
    val work: Future[Unit] = beginWork(consumerRecords)

    work.onSuccess {
      case _ ⇒ client ! Confirm(consumerRecords.offsets, commit = true)
    }
}

As you can see, the work that the microservice starts in beginWork is expected to be fast and once it succeeds, the microservice confirms the consumption of the records at consumerRecords.offsets. A pracitcal exmaple in the Twitter vision application is the SceneClassifierActor. The records are said to be consumed when it has classified the contents of each image and produced a message on the "scene" topic.


class SceneClassifierActor(
  consumerConf: KafkaConsumer.Conf[String, Envelope], 
  consumerActorConf: KafkaConsumerActor.Conf,
  producerConf: KafkaProducer.Conf[String, Envelope],
  sceneClassifier: SceneClassifier) extends Actor {

  import SceneClassifierActor._

  private[this] val kafkaConsumerActor = context.actorOf(...)
  private[this] val producer = KafkaProducer(conf = producerConf)

  override def receive: Receive = {
    case extractor(consumerRecords) ⇒

      // we process each record, and have Future[RecordMetadata] as a receipt for 
      // producing a response
      val futures = consumerRecords.pairs.flatMap {
        case (_, envelope) ⇒
          val is = new ByteArrayInputStream(envelope.payload.toByteArray)
          sceneClassifier.classify(is).map { scene ⇒
            val out = Envelope(...)
            producer.send(KafkaProducerRecord("scene", envelope.handle, out))
          }.toOption
      }

      // we confirm offsets if *all* responses have been successfully sent
      import context.dispatcher
      Future.sequence(futures).onSuccess {
        case _ ⇒ client ! Confirm(consumerRecords.offsets, commit = true)
      }
  }

}

Kafka manages offsets; clients confirm on persistence

This model is a variation of the first one; it is applicable whenever the processing of the received records would take us over the confirmation timeout. Nota bene that it is not a good idea to simply increase the timeout: your goal should be to maintain lively message flow, where it is possible to spot a failing microservice very early. Increasing timeouts leads to increased latency and delayed recovery.


client ! Subscribe(self)

def receive: Receive = handleEvents orElse {
  case extractor(consumerRecords) ⇒
    val payload = payloadFrom(consumerRecords)

    persist(JustEvent(payload)) { cmd ⇒ 
      deliver(self.path)(deliveryId ⇒ (deliveryId, cmd))
      sender() ! Confirm(consumerRecords.offsets, commit = true)
    }
}

def handleEvents: Receive = {
  case (deliveryId: Long, JustEvent(payload)) ⇒
    doWork(payload).onSuccess { case _ ⇒ confirmDelivery(deliveryId) }
}

The code here is a combination of Akka's AtLeastOnce trait together with Akka Persistence and then the usual Kafka client mechanics. In essence, when we receive the records, we create an event from the records, we persist the event; on successful write, we trigger Akka's at-least-once (local) delivery, and confirm the offsets to Kafka. Akka (specifically its at-least-once and persistence modules) then take care of delivering the just persisted event to self again. When halding the event (and the generated deliveryId), we do our work and on its success confirm the delivery of the message. Similar timeout rules apply, but the timeouts are now local to the microservice, and do not impact on the liveliness of the core messaging infrastructure. In our Twitter system, this pattern is shown in the IdentityMatcherActor.


class IdentityMatcherActor(producerConf: KafkaProducer.Conf[String, Envelope],
                           identityMatcher: IdentityMatcher)
  extends PersistentActor with AtLeastOnceDelivery {
  import IdentityMatcherActor._
  lazy val Success(faceExtractor) = FaceExtractor()

  private[this] val producer = KafkaProducer(conf = producerConf)

  import scala.concurrent.duration._

  override def supervisorStrategy: SupervisorStrategy = ...

  override val persistenceId: String = "identity-matcher-actor"

  def identifyFacesAndSend(identifyFaces: Seq[IdentifyFace])(implicit executor: ExecutionContext): Future[Unit] = {
    val sentFutures = identifyFaces.flatMap { identifyFace ⇒
      faceExtractor.extract(identifyFace.image.toByteArray).map(_.map { faceImage ⇒
        val face = identityMatcher.identify(faceImage.rgbBitmap.newInput()) match {
          case Some(identifiedFace) ⇒ Identity.Face.IdentifiedFace(identifiedFace)
          case None ⇒ Identity.Face.UnknownFace(Identity.UnknownFace())
        }
        val identity = Identity(face = face)
        val out = Envelope(...)
        producer.send(KafkaProducerRecord("identity", identifyFace.handle, out)).map(_ ⇒ Unit)
      }).getOrElse(Nil)
    }

    Future.sequence(sentFutures).map(_ ⇒ Unit)
  }

  def handleIdentifyFace: Receive = {
    case (deliveryId: Long, identifyFaces: IdentifyFaces) ⇒
      import context.dispatcher
      identifyFacesAndSend(identifyFaces.identifyFaces).onSuccess { case _ ⇒ confirmDelivery(deliveryId) }
  }

  override def receiveRecover: Receive = {
    case IdentifyFaces(faces) ⇒
      import context.dispatcher
      identifyFacesAndSend(faces).onFailure { case _ ⇒ self ! Kill }
  }

  override def receiveCommand: Receive = handleIdentifyFace orElse {
    case extractor(consumerRecords) ⇒
      val identifyFaces = consumerRecords.pairs.map {
        case (_, envelope) ⇒ IdentifyFace(envelope.ingestionTimestamp, envelope.correlationId, envelope.handle, envelope.payload)
      }

      persist(IdentifyFaces(identifyFaces = identifyFaces)) { result ⇒
        deliver(self.path)(deliveryId ⇒ (deliveryId, result))
        sender() ! Confirm(consumerRecords.offsets, commit = true)
      }
  }

}

Note the handling of case IdentifyFaces(faces) in recieveRecover. When receiveRecover runs, it runs because the supervision strategy was triggered. If it triggered because identifyFacesAndSend fails, we want to know as soon as possible. And so, if it fails again, we send the Kill message to self, triggering another round of the supervision strategy.

Microservice manages offsets; stores the offsets & messages in its journal.

The last approach is to relieve Kafka of automatically managing the offsets, and to manage the offsets ourselves. In essence, this means storing the offsets and messages in our journal. Again, the essential pseudo-code is


def receiveRecover: Receive = {
  case CommandAndOffsets(offsets, payload) ⇒
    doWork(payload)
      .onFailure { case _ ⇒ self ! Kill }
      .onSuccess { case _ ⇒ client ! Subscribe(self, offsets) }
}

def receive: Receive = handleEvents orElse {
  case extractor(consumerRecords) ⇒
    val payload = payloadFrom(consumerRecords)

    persist(EventAndOffsets(consumerRecord.offsets, payload)) { cmd ⇒ 
      deliver(self.path)(deliveryId ⇒ (deliveryId, cmd))
   }
}

def handleEvents: Receive = {
  case (deliveryId: Long, EventAndOffsets(_, payload)) ⇒
    doWork(payload).onSuccess { case _ ⇒ confirmDelivery(deliveryId) }
}

This is the approach that Lagom takes; in fact Lagom goes as far as to mandate no ad-hoc message sending. Where in our Akka code, we were able to write producer.send, in Lagom, we can only produce messages by subscribing to a stream of (aggregate) events from the journal. This, together with the automatic offsets management, allows Lagom services to be lively and reliable; but Lagom builds convenient APIs to make this rather complex approach easy to use.

Summary

In this post, I showed the architecture of the system, and the way in which it handles the various messages that it consumes and produces. Once we have the appropriate protocols defined, I showed how to ensure that the entire system (i.e. including its REST API, journal as well as Kafka messaging) uses these formats. Finally, I showed the core messaging patterns in Kafka.

In the next post, we will explore the computer vision and learning core, but in the meantime, the code is at https://github.com/eigengo/reactive-summit-2016, and I am on @honzam399.

Recent Posts

Posts by Topic

see all

Subscribe to Email Updates