Tuesday, June 10, 2014

Scala Streams: A Gentle Introduction

Target audience: Beginner
Estimated reading time: 10'

How can we leverage Scala Streams to manage very large data sets with limited computing resources?


Overview
A Stream instance can be regarded as lazy list, or more accurately a list with lazy elements. The elements are allocated only when accessed. Stream allows Scala developers to write infinite sequences. Elements can be removed from memory (to be handled by the GC)defined) by eliminating any reference to its elements once no longer needed.


Performance Evaluation
It is easy to understand the benefit of Stream in term of memory management. But what about the performance?
Let's compare Stream and List using 3 simple operations:
  • Allocating elements
  • Retrieving a random element
  • Traversing the entire collection
Let's start by defining these operations in the context of computing the mean value of a very large sequence of double values.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val NUM = 10000000 
 
   // Allocation test
val lst = List.tabulate(NUM)( _.toDouble)

   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = lst(Random.nextInt(NUM-1)}
)
   // Reducer test
lst.reduce( _ + _ )/lst.size

The operation of reading a value at a random index is repeated 10,000 times in order to make the performance evaluation more reliable (line 8, 9). The mean is computed using a simple reduce method (line 12)

Let's implement the same sequence of operations using Stream class.

1
2
3
4
5
6
7
8
val strm = Stream.tabulate(NUM)( _.toDouble)
   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = strm(Random.nextInt(NUM-1)}
)
   // Reducer test
strm.reduceRight( _ + _ )/strm.size
The implementation of the generation of random values using Stream is very similar to the implementation using List (line 4, 5). The mean of the stream is also computed with a reducer (line 8).

The test is run 20 times to avoid distortion of the initialization of the JVM. 


The allocation of the elements in the stream is slightly faster than the creation of the list.
The main difference is the time required by the List and Stream to traverse the entire collection using the reduceRight method as a reducer. In this code snippet above, the Stream has to allocate all its elements at once. This scenario is very unlikely as Streams are usually needed to process section or slices of a very large sequence of values or objects, as demonstrated in the next section.


Use case: moving average
The most common application of Scala Stream is iterative or recursive application of a function/transform or sequence of functions to a very large data set, in this case, the mean value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val strm = Stream.fill(NUM)( Random.nextDouble )
  
val STEP = 5
val sum = strm.take(STEP).sum
val avStrm = strm.drop(STEP)

 // Apply the updating formula 
 // Sum(n, n+STEP) = Sum(n -1, STEP) - x(n-1) + x(n)
avStrm.zip(avStrm.tail)
      .map(x => sum - x._1 + x._2)
      .map( _ /STEP)

First, the code creates a reference strm of a stream of NUM random values (line 1). Then it computes the sum of the first STEP elements of the stream (line 4). Once the sum is computed, these elements are dropped from the stream (line 5). The mean value is updated for each new batch of new STEP elements (line 9-11).

Here is an alternative implementation of the computation of the moving average on a stream of floating point values using the tail recursion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def average(strm: Stream[Double], window: Int): Stream[Double] = {
  
  @scala.annotation.tailrec
  def average(
    src: Stream[Double], 
    target: Stream[Double]): Unit = {
    
    if( !src.isEmpty ) {
      val tailSrc = src.tail
      val newSum = sum - src.head + tailSrc.head
       average(strm.tail, target :+ newSum)
    }
  }
   
  val _strm = Stream.empty[Double] :+ strm.take(window).sum
  average(strm.drop(window), _strm)
  _strm.map( _/ window) 
}

The recursive call average (line 4) has two arguments: the stream src traversed through the recursion (line 5), and the stream that collects the average (mean) values (line 6). The method recurses as long as the source stream src is not empty (line 8).
The performance of the computation of the mean can be greatly improved by parallel its execution, Stream.par


References

Monday, June 2, 2014

Akka Actors Blocking on Futures

Target audience: Intermediate
Estimated reading time: 15'

This is a brief introduction to distributed computing using blocking Scala/Akka futures.


Overview
Akka is actor based and message-driven framework for building concurrent and reliable applications. As such Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously in the future. The code snippets in our two examples omits condition test or supporting methods for clarity sake. The code compiles and executes with Akka 2.2.4 and 2.3.6

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 


Concurrent processing
One simple and common usage of Akka/Scala futures is to have some computation performed concurrently without going through the pain of creating actors.
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.

The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
case class Start()
 
final class FuturesController(
  xt: Array[Double],
  xs: List[Double => Double])
 (implicit timeout: Timeout) extends Actor {
   
  override def receive = {
    case s: Start => {
      val futures = createFutures
      val results = futures.map(
         Await.result(_, timeout.duration)
      )

      sender ! results.zipWithIndex.minBy( _._1 )._2
    }
    case _ => logger.error("Message not recognized")
  }
 
  def createFutures: List[Future[Double]]
}

The event handler receive (line 8) for the message Start creates as many future as needed (equal to the number of models to evaluate) (line 10). The controller/actor blocks by invoking Await until each of the future tasks completes (line 12). Finally, the controller returns the result of the computation (line 15), in this case the fittest of the models xs. The handler logs a simple error message in case a message other than Start is received (line 17).

The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def createFutures: List[Future[Double]] =
  
  xs.map(f => Future[Double] { 
    val yt = Range(0, xt.size).map( f(_) )
    val r = (xt, yt).zipped./:(0.0)((sum, z) => {
       val diff = z._1 - z._2
       sum + diff*diff
    })

    Math.sqrt(r)
  })
}


Evaluation
Let's consider an input data set generated with the following model
  y = m(m-1) + r  // r [0. 1]
where r is a random value between 0 and 1, representing noise.

val TIME_SERIES_LEN = 200
val xt = Array.tabulate(TIME_SERIES_LEN)(
  m => m*(m - 1.0) + Random.nextDouble
)

The following code snippet lists all key packages to be imported for most common applications using Akka actors and futures (lines 1 to 6).

The driver program instantiates the Actor system (line 12), creates the controller actor master using the Akka actor factory actorOf (lines 13, 14). It sends a ask request (line 18) and returns the best model (line 22) if the controller completes it tasks within a predefined timeout (line 10). The code print the stack trace in case of failure (line 23).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import akka.actor.{Actor, Props, ActorSystem}
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import akka.pattern.ask
 

private val duration = Duration(2000, "millis")
implicit val timeout = new Timeout(duration)
   
val actorSystem = ActorSystem("system")
val master = actorSystem.actorOf(
  Props(new FuturesController(xt, approx)), "Function_eval"
)
 
Try {
  val future = master ? Start()
  Await.result(future, timeout.duration)
} 
match {
  case Success(n) => logger.info(s"Best fit is $n")
  case Failure(e) => e.printStackTrace
}

actorSystem.shutdown

It is critical that the appplication shuts downthe the Akka system before it exits (line 26).


References