Tuesday, July 22, 2014

Scala Futures with Callbacks

Futures are a critical concept in Scala and parallel computing. This post describes and illustrates the futures in Scala and Akka as a non-blocking mechanism in distributed computing.

Overview
Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously back to the client controller.
In a previous post, we looked into Akka futures for which the client is blocked until all the concurrent tasks (futures) have completed. In this post, we look into the non-blocking approach that relies on onSuccess and onFailure callback handlers. We reuse the same example as with the blocking design

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Futures callback design
The first step is to create a controller to manage the concurrent tasks, FuturesController. The controller takes the input time series xt and a list of model candidates. As the time series uses a single variable, the model are simply defined as a one variable function
(x: Double) => f(x).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
case class Launch()
 
final class FuturesController(
  xt: Array[Double],
  xs: List[Double => Double])
 (implicit timeout: Timeout) extends Actor {
   
    // Event/message handling loop/thread
    // implemented as a partial function
  override def receive = {
    case s: Launch => 
      sender ! processFutures(createFutures)
  }
 
  def createFutures: List[Future[Double]]
  def processFutures(futures: List[Future[Double]]): Double
}

The event handler receive (line 10) for the message Launch (line 1) creates as many futures as needed (equal to the number of models to evaluate) createFutures (line 12). The futures are then passed to the method processFutures for handling the callbacks notification (line 16).
The futures are created with their computation (or execution) unit. In this particular case, the task consists of computing the least squared error for each model relative to the input dataset (lines 5 to 9)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createFutures: List[Future[Double]] = 
 xs.map(f => Future[Double] { 
    
   val yt = Range(0, xt.size).map( f(_) )
   val r = (xt, yt).zipped.foldLeft(0.0)((sum, z) => {
     val diff = z._1 - z._2
     sum + diff*diff
   })
   Math.sqrt(r)
})

The method processFutures iterates through the notification (success/failure) from all the futures [1]. It filters out the future task(s) which fails [2] then select the model with the lowest least square error [3].
This version allows to catch an exception before applying the filter. The filter catches the incorrect values marked at -1 (2).

def processFutures(futures: List[Future[Double]]): Double =

  futures.map(f => { //1 
    f onSuccess { case y => y }
    f onFailure { case e: Exception => { 
      println(e.toString)
      -1.0 
    }}
  }).filter( _ != -1.0) // 2
    .zipWithIndex.minBy( _._1)._2 //3


Evaluation
Let's consider a very simple application which consists of distributing the following computation
   s(x) = f(x) + g(x) + h(x)  // ff(0), ff(1), ff(2)
Each computation relies on a different ff polynomial functions with an increasing order of complexity. Each of the three computations are assigned to one actor running on a dedicated CPU core.

val ff = List[Double => Double](
 (x: Double) => 3.0*x,
 (x: Double) => x*x + x,
 (x: Double) => x*x*x + x*x + x
)

The compute method create a list of futures from the list ff of computational functions. Then the method process the call back onSuccess and onFailure
The first version is a detailed implementation of the creation and processing of futures

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def compute(x: Double): Double = {
    // Create a future for each of the component of ff
  val futures = ff.map(f => Future[Double] { f(x) } )
 
     // processes the results from each future
  var sum = 0.0
  futures.foreach( f => {
    f onSuccess { case x: Double => sum += x }
    f onFailure { case e: Exception => println(e.toString) }
  })
  sum
}
 
val x = 2.0
println(s"f($x) = ${compute(x)}")

The appropriate implicit conversions needs to be imported (line 2). A future is created from each of the three polynomial function ff (line 6). Then the output of each of the future execution is aggregated (line 11). This version is relies on a two step process
  • Creation of futures (line 6)
  • Computation and aggregation (lines 10 - 14)
A more compact version relies on the foldLeft higher order method to combine the creation and execution of futures.

1
2
3
4
5
6
7
8
9
def compute(x: Double): Double = {

 ff.map(f => Future[Double] { f(x) } )./:(0.0)((s, f) => {
    var y = 0.0
    f onSuccess { case x: Double => y = s + x  }
    f onFailure { case e: Exception =>  y = s }
    y
 })
}

References
Akka Essentials - G Manish - Packt Publishing - 2012
Programming in Scala M. Odersky, L. Spoon, B. Venners - Artima 2010
Scala for Machine Learning Scala for Machine Learning - Chap 12 Scalable Frameworks / Akka - P. Nicolas - Packt Publishing - 2014

No comments:

Post a Comment