Monday, June 23, 2014

Handling Scala Option Elegantly

This post reviews the different alternative mechanisms in Scala to handle errors. It also illustrates the applicability of the Option monad.

Overview
The Option monad is a great tool for handling error, in Scala: developers do not have worry about NullPointerException or handling a typed exception as in Java and C++.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Let's consider the simple square root computation, which throws an exception if the input value is strictly negative (line 2). The most common "java-like" approach is to wrap the computation with a try - catch paradigm. In Scala catching exception can be implemented through the Try monad (lines 7-9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def sqrt(x: Double): Double = 
  if(x < 0.0) 
    throw MathException(s"sqrt: Incorrect argument $x")
  else 
    Math.sqrt(x)
 
Try ( sqrt(a)) match {
  case Success(x) => {}
  case Failure(e) => Console.println(e.toString)
}

This type of implementation put the burden on the client code to handle the exception. The Option monad provides developer an elegant to control the computation flow.

Handling Option values
The most common to handle a Scala option is to unwrap it. Let's consider the function
 
  y = sin(sqrt(x))
Clearly, there is no need to compute y if x is negative.

def sqrt(x: Double): Option[Double] = {
  if(x < 0.0) None
  else Math.sqrt(x)
}
 
def y(x: Double): Option[Double] = sqrt(x) match {
  case Some(y) => Math.sin(x)
  case None => None
}

The computation of the square root is implemented by the method sqrt while the final computation of sin(sqrt(x)) is defined by the method y.

This implementation is quite cumbersome because the client code has to process an extra Option. An alternative is to provide a default value (i.e 0.0) if the first computational step fails.

def y(x: Double): Double = Math.sin(sqrt(x)).getOrElse(0.0)

A more functional and elegant approach uses the map higher order function to propagate the value of the Option.

def y(x: Double): Double = 
   sqrt(x).map(Math.sin(_)).getOrElse(0.0)

What about a sequence of nested options? Let's consider the function y = 1/sqrt(x). There are two types of errors:
  • x < 0.0 for sqrt
  • x == 0.0 for 1/x
A third solution consist of applying the test for x > 0.0 to meet the two conditions at once.

def y(xdef y(x: Double): Double = 
  if(x < 1e-30) None
  else Some(1.0/(Math.sqrt(x)))

for comprehension for options
However anticipating the multiple complex conditions on the argument is not always possible. The for comprehensive for loop is an elegant approach to handle sequence of options.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def inv(x: Double): Option[Double] = {
  if(Math.abs(x) < 1e-30) None
  else 1.0/x
}

def log(x: Double): Option[Double] = {
  if(x < 1e-30) None
  else Math.log(x)
}
 
def compose(x: Double): Double =
 (for {
    y <- sqrt(x)
    z <- inv(y)
    t <- log(z)
  } yield t).getOrElse(0.0)

The objective is to compose the computation of a square root with the inverse function inv (line 1) and natural logarithm log (line 6). The for comprehension construct (lines 11-15) propagates the result of each function to the next in the pipeline through the automatic conversion of option to its value. In case of error (None), the for method exists before completion.
For-comprehension is a monad that compose (cascading) multiple flatMap with a final map method.
  for {
    a <- f(x)   // flatMap
    b <- g(a)   // flatMap
    c <- h(b)   // map
  } yield c 

References

Monday, June 2, 2014

Akka Actors Blocking on Futures

This is a brief introduction to distributed computing using blocking Scala/Akka futures.

Overview
Akka is actor based and message-driven framework for building concurrent and reliable applications. As such Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously in the future. The code snippets in our two examples omits condition test or supporting methods for clarity sake. The code compiles and executes with Akka 2.2.4 and 2.3.6

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Concurrent processing
One simple and common usage of Akka/Scala futures is to have some computation performed concurrently without going through the pain of creating actors.
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.

The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
case class Start()
 
final class FuturesController(
  xt: Array[Double],
  xs: List[Double => Double])
 (implicit timeout: Timeout) extends Actor {
   
  override def receive = {
    case s: Start => {
      val futures = createFutures
      val results = futures.map(
         Await.result(_, timeout.duration)
      )

      sender ! results.zipWithIndex.minBy( _._1 )._2
    }
    case _ => logger.error("Message not recognized")
  }
 
  def createFutures: List[Future[Double]]
}

The event handler receive (line 8) for the message Start creates as many future as needed (equal to the number of models to evaluate) (line 10). The controller/actor blocks by invoking Await until each of the future tasks completes (line 12). Finally, the controller returns the result of the computation (line 15), in this case the fittest of the models xs. The handler logs a simple error message in case a message other than Start is received (line 17).

The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def createFutures: List[Future[Double]] =
  
  xs.map(f => Future[Double] { 
    val yt = Range(0, xt.size).map( f(_) )
    val r = (xt, yt).zipped./:(0.0)((sum, z) => {
       val diff = z._1 - z._2
       sum + diff*diff
    })

    Math.sqrt(r)
  })
}

Evaluation
Let's consider an input data set generated with the following model
  y = m(m-1) + r  // r [0. 1]
where r is a random value between 0 and 1, representing noise.

val TIME_SERIES_LEN = 200
val xt = Array.tabulate(TIME_SERIES_LEN)(
  m => m*(m - 1.0) + Random.nextDouble
)

The following code snippet lists all key packages to be imported for most common applications using Akka actors and futures (lines 1 to 6)
The driver program instantiates the Actor system (line 12), creates the controller actor master using the Akka actor factory actorOf (lines 13, 14). It sends a ask request (line 18) and returns the best model (line 22) if the controller completes it tasks within a predefined timeout (line 10). The code print the stack trace in case of failure (line 23).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import akka.actor.{Actor, Props, ActorSystem}
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import akka.pattern.ask
 

private val duration = Duration(2000, "millis")
implicit val timeout = new Timeout(duration)
   
val actorSystem = ActorSystem("system")
val master = actorSystem.actorOf(
  Props(new FuturesController(xt, approx)), "Function_eval"
)
 
Try {
  val future = master ? Start()
  Await.result(future, timeout.duration)
} 
match {
  case Success(n) => logger.info(s"Best fit is $n")
  case Failure(e) => e.printStackTrace
}

actorSystem.shutdown

It is critical that the appplication shuts downthe the Akka system before it exits (line 26).


References
Akka Essentials - G Manish - Packt Publishing - Oct 2012
Scala for Machine Learning - Chap 12 Scalable Frameworks / Akka P. Nicolas - Packt Publishing - 2014