Unit Testing - Cassandra with Phantom

Posted by Michal Janousek on Mon, Mar 21, 2016


 

I have been using apacha cassandra/ DSE on a project and it makes sense to leverage the typesafety of scala using the Phantom library which is as they say, "Reactive type-safe Scala DSL for Cassandra". A couple of months ago version v1.19.0 was released which has deprecated the testkit. After updating to new version it has broken all the existing tests in our project. Phantom has good documentation on the wiki pages so I don't think that it is necessary to describe how to use it. But the documentation hasn't been updated recently. So I think it is a good opportunity to describe how to write the tests, as well as showing how to run the test using embedded cassandra.


The testkit has been deprecated so the CassandraFlatSpec and CassandraFeatureSpec are not available any more. So we need to write our base Spec class which will spawn embemmed cassandra instance and can be used for the unit testing. Before this has been done automatically in the removed Spec classes. There is an example of the test available inside the Phantom repository but it contains only use of local cassandra.

 

Step 1: Create a CassandraSpec

The CassandraSpec is a trait which extends BeforeAndAfterAll from the scalatest and allows to add custom code which will be called once before any of the tests in the suite is run def beforeAll()  and then after all the tests have been run another def afterAll(). That's exactly the place where we can start / initialize / clean embedded cassandra.

package foo

import com.typesafe.config.ConfigFactory
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import com.datastax.driver.core.ResultSet

import org.scalatest._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

/**
 * Common trait used to define specifications related to Cassandra access.
 */
trait CassandraSpec extends BeforeAndAfterAll with ScalaFutures {
  this: Suite ⇒

  private lazy val config = ConfigFactory.parseString(
    s"""
      |cassandra {
      |   keyspace = "test"
      |   contact-points = "127.0.0.1"
      |   port = 9142
      |}
    """.stripMargin
  )

  lazy val yamlFilename = "cassandra_network_strategy.yaml"
  lazy val cassandraDatabase = CassandraDatabase(config)
  lazy val keySpace = cassandraDatabase.keySpace

  implicit val patienceTimeout = org.scalatest.concurrent.PatienceConfiguration.Timeout(10.seconds)

  /**
   * Initialize the database by computing a schema from the table definition.
   */
  private def initializeDatabase(): Future[Seq[ResultSet]] = {
    import cassandraDatabase._
    import com.websudos.phantom.dsl._

    Await.ready(cassandraDatabase.autocreate().future(), 60.seconds)

  }

  override protected def beforeAll(): Unit = {
    EmbeddedCassandraServerHelper.startEmbeddedCassandra(yamlFilename, 60.seconds.toMillis)
    initializeDatabase()
    super.beforeAll()
  }

  override protected def afterAll(): Unit = {
    EmbeddedCassandraServerHelper.cleanEmbeddedCassandra()
    super.afterAll()
  }

}

The startEmbeddedCassandra method takes a path to yaml file which contains some of the basic configuration for the cassandra. There is one provided by default so it's not a required paremeter it really depends on how you want to use cassandra. The other important thing is that the embedded cassandra is started only once - so it detects if an instance is running so we can reuse this trait in a multiple tests.

There isn't currently a method for stoping the emmbedded server as it was causing a problem; maybe in a future release. The cleanEmbemmedCassandra is a clean up method which drops all the keySpaces in the db so it can be used again in a different suite. 

The initializeDatabase()  method is using autocreation capabilites of Phantom. It takes all the tables registered in the Database and generates the create statements and then it waits for the execution because it can take a while. In the real environment the tables are initialized from CQL script which contains the replication factor. 

Step 2: Define a Test

Once we have defined the test base trait let's use it. It's quite straightforward. 

package foo
import java.util.UUID

import scala.concurrent.duration._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, FlatSpec}
import scala.concurrent.ExecutionContext.Implicits.global

class CassandraDAOSpec extends FlatSpec with CassandraSpec with Matchers {

   it should "allow to store a user from a model and list it" in {
    val emailAddress = "foo@foo.foo"
    val id = UUID.randomUUID()
    val user = User(emailAddress, id, "test", "test")
    val chain = for {
      store ← cassandraDatabase.userAuths.store(userAuth)
      get ← cassandraDatabase.userAuths.get(emailAddress)
    } yield {
      (store, get)
    }

    whenReady(chain, patienceTimeout) {
      case (store, get) ⇒
        store shouldEqual true
        get should contain(userAuth)
    }
  }
 }

This test will simply spawn an emmbemmed cassandra instance, initialize it and then tests that it can store a user and then retrieve the same user. This is the simplest example and we can of course test much more but it depends on the use case.

Step 3: Embedded Cassandra Logging

What I have noticed is that embedded cassandra polutes the test logs heavily. It can be useful for debugging and tuning but not for a simple unit testing. To fix that is actually quite easy as well. If the logback is your logging solution of choice it's enough to suppress logging level to ERROR as follows:

<logger name="com.datastax" level="ERROR" additivity="false" />
<logger name="org.apache.cassandra" level="ERROR" additivity="false" />

And that is it. I personally think that phantom is currently the best dsl for cassandra in Scala so I hope that this post will be helpfull. If you have any comments or question don't hesitate to contact me directly on twitter @teroxik or post comments here. 

 

Update

To provide the most up to date information - there is an sbt plugin - which addresses issue of starting the embedded cassandra for testing purposes - sbt-phantom. It automatically takes default configuration and starts embedded cassandra before running any tests. 

 

There has been a more recent blog post from the creators of phantom - outworkers Phantom tips tip2 testing with phantom-sbt. I wasn't aware of it at the time of writing this blog.

 


 

Posts by Topic

see all

Subscribe to Email Updates