Friday, April 15, 2016

Managing Spark context in ScalaTest

This post describes a methodology to manage the Spark context while testing you application using ScalaTest

Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
  • Specify your Spark context configuration SparkConf
  • Create the Spark context
  • Add test code related to your application
  • Clean up resources (Spark context, Akka context, File handles...
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context lifecyle of your test application.

Wrapping the Spark context
The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
trait SparkContextWrapper {
  protected[this] var sc: SparkContext = _

  def getContext: SparkContext = {
    val conf = new SparkConf().setAppName(s"Test-App")
      .set("spark.executor.instances", "2")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "2g")
      .set("spark.executor.cores", "2")
      .set("spark.serializer", 
              "org.apache.spark.serializer.KryoSerializer")
      .set("spark.rdd.compress", "true")
      .set("spark.shuffle.spill", "true")
      .set("spark.shuffle.spill.compress", "true")
      .set("spark.shuffle.memoryFraction", "0.4")
      .set("spark.io.compression.codec", "snappy")
      .set("spark.network.timeout", "600")

    sc = SparkContext.getOrCreate(conf.setMaster("local[4]"))
    sc.setLogLevel("ERROR")
    sc
  }

  def close: Unit = {
    if (sc != null) sc.stop
  }
}

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
trait SparkContextForTesting 
     extends SparkContextWrapper with BeforeAndAfterAll { 
  self: Suite =>

  override def beforeAll: Unit = {
    getContext
    super.beforeAll
  }

  override def afterAll: Unit = {
    close
    super.afterAll
  }
}

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class MyTest 
  extends FlatSpec 
    with Matchers 
      with SparkContextForTesting {

 val x: Int = -4
 it should "My first test" in {
   val sqlContext = new SQLContext(sc) 
   // ....
 }
   // ... other tests

 it should "My last test" in {
   // clean up after it completes
 }
}

As they say at the end of the movie, "That's all folks!"

Reference
ScalaTest User Guide

No comments:

Post a Comment