Thursday, February 26, 2015

Scala implementation of Function Vectors

This post describes the definition and an implementation of the functions vector.

Overview
Space of functions are commonly used in machine learning (kernel functions) and differential geometry (tensors and manifolds).The most common technique to create a functions space is to define the space as a category and assign monadic operations to it.
This post discusses an alternative, less elaborate approach to define a functions space as a container. The key operator on function is the composition as defined in the Function1[-T, +R] class:
    def andThen[A](g: (R) ⇒ A): (T) ⇒ A
    def compose[A](g: (A) ⇒ T): (A) ⇒ R
Let's use andThen as the basic block to build our functions space.


andThen
The andThen is a member method of some of the Scala standard library collections such as List and HashMap.

val xsInt = List[Int](4, 0, 9, 56, 11)
xsInt.andThen((n: Int) => n*n).mkString(",")
   // print 16,0,81,3136,121

As mentioned in the introduction, the Function1 implements andThen to compose two functions as this o otherFunction where the o composition operator is defined as
     f o g:  (t: T) => f(g(t))
The method is implemented using the andThen method

val fg = Math.sin andThen (x: Double) => x*x
Console.println(s"fg(5.0): ${fg(5.0)}")

Function vectors space
A function space is defined as a space of function vectors, similar to Euclidean space for which variables is defined as a vector or array of real values. A function vector is commonly used to convert a vector or tensor from one space to another non-euclidean space by transforming the original coordinates (x,y,z) to another coordinates in a different space (f(x,y,z), ..) using the function vector.

Let's implement the functions vector as the class FunctionVector (line 1) and its two most important method, composition (line 2) and the dot product (tensor product) (line 3).

1
2
3
4
5
6
class FunctionVector[T](fVec: List[T=>T]) {
  def andThen(op: T => T): T=>T
  def dot(opVec: List[T=>T])(implicit num: Numeric[T]): T => T
  def map(op: T => T): List[T=>T]
   ...
}

The dot product is defined as the product of two vectors functions. For example in a function vector spaces of dimension 3, v(f,g,h) and v'(f',g',h')
    dot: (x,y,z) -> f o f'(x,y,z) + g o g'(x,y,z) + h o h'(x,y,z)
The method andThen composes this function vector with a function op and generates a 'cumulative' composed function as
    f o g o h o op
The iteration along the function vector is implemented as a tail recursion. The method relies on the List[T=>T].andThen method (line 6).

1
2
3
4
5
6
7
8
9
def andThen(g: T => T): T => T = {

  @scala.annotation.tailrec
  def andThen(xsf: List[T=>T])(op: T => T): T => T =
   if(xsf.size == 0) op 
   else andThen(xsf.drop(1))(xsf.head andThen op)
   
  andThen(fVec.reverse)(op)
}

The 'dot' product takes another function vector of same size as argument. The two vectors are zipped to generate a Iterable[T=>T] vector of composed function elements using a map. Finally the resulting dot function is computed by generating a new function with summation of the elements of the composed functions vector. The summation requires the instance of Numeric to be declared as an implicit argument.

def dot(gVec: List[T=>T])(implicit num: Numeric[T]): T => T = { 
 
  val composed = fVec.zip(opVec).map(fg => fg._1.andThen(fg._2))
  (t: T) => composed.map( _(t)).sum
}

Contrary to the andThen method, the map convert a function vector to another function vector by applying a natural transformation.

  // return List[T => T]
def map(op: T => T) = fVec.map( _ andThen op)

Finally, the definition of the dot product can be extended to any aggregation method aggr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def dot(
  opVec: List[T=>T], 
  aggr: (T =>T, T=>T) =>  (T=>T)): T => T = {
  
  val composed = fVec.zip(opVec).map( 
    fg => fg._1 andThen fg._2
  )
  
  (t: T) => composed./:((t: T) => t)(
    (h,f) =>aggr(h,f)
  )(t)
}

The aggregation method passed as argument of the dot method (line 3) is a simplified version of the aggregation defined in the Scala standard library. It is used to implement the dot method on the composed functions vectors (line 10).
Let's apply our new found knowledge about FunctionVector:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val functionsList = List[Function1[Double,Double]](
   Math.sin, Math.sqrt
)
   
val vec = new FunctionVector[Double](functionsList)
val output1 = vec.andThen((x: Double) => x*x)
val opVec = List[Double=>Double](
 (x: Double)=> x+ 1.0, 
 (x: Double)=> Math.exp(-x)
)
val output2 = vec.dot(opVec)

For evaluating our Function vectors classes, we used a list of functions of type Double => Double (line 1): a sinusoidal and a square root functions (line 2).
Once the function vector is created (line 5), it is available to be composed with the existing list of functions of the class FunctionVector (lines 5 and 6).


References
Scala By Example - M. Odersky - June 2014

Thursday, February 5, 2015

Back-pressure Strategy using Akka Mailboxes

Overview

Akka is a very reliable and effective library to deploy tasks over multiple cores and over multiple hosts. Fault-tolerance is implemented through a hierarchy of supervising actors, not that different from the Zookeeper framework.
But what about controlling the message flows between actors or clusters of actors? How can we avoid messages backing up in mailboxes for slow, unavailable actors or lengthy computational tasks?
Typesafe has introduced Akka reactive streams and Flow materialization to control TCP back-pressure and manage data flows between actors and cluster of actors. TCP back pressure is a subject for a future post. In the meantime let's design the poor's man back-pressure handler using bounded mail boxes.


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Simple workflow with back pressure control
Let's look at a simple computational workflow with the following components:
  • Workers: These actors process and transform data sets. They start a computation task upon receiving a Compute message that contain the next data set to process. Each worker actor returns the results of the computation (transformed data set) back to the controller using the Completed message.
  • Controller is responsible for loading batch of data through a Load message and forward it to the workers in a Compute message. The Controller request the status of the bounded mail box for each worker by sending a GetStatus to the watcher.
  • Watcher monitor the state of the workers' mail box and return the current load (as percentage of the mail box is currently used) to the controller with a Status message.
The following diagram describe the minimum set of messages required to execute the workflow.


Workers
The worker actor is responsible for processing a slice of data using a function forwarded by the controller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final class Worker(id: Int) extends Actor {

  override def receive = {
    // Sent by master/controller to initiate 
    // the computation with a msg.fct invocation
    case msg: Compute => {
      val output = msg.fct(msg.xt)
      sender ! Completed(id, output, msg.id+1)
    }
      // last request
    case Cleanup => sender ! Done
  }
}


The worker receives the input to the computation (slice of time series msg.xt and the data transformation msg.fct through the Compute message sent by the controller. Note that the function fct cannot be defined as a closure as the context of the function is unknown to the worker.
The actor return the result output of the computation through the Completed message. Finally all the workers notify the controller that their tasks is completed by responding to the Cleanup message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type DblSeries = List[Array[Double]]
 
sealed abstract class Message(val id: Int)

  // Sent by worker back to controller
case class Completed(
  i: Int, 
  xt: DblSeries, 
  nIter: Int
  ) extends Message(i)

  // Sent by controller to workers
case class Compute(
  i: Int, 
  xt: DblSeries, 
  fct: DblSeries => DblSeries
  )  extends Message(i)

case class Cleanup() extends Message(-1)


Controller
The worker actor is responsible for processing a slice of data using a function forwarded by the controller. It takes three parameters.

  • numWorkers: Number of worker actors to create
  • watermark: Define the utilization of the worker mail box which trigger a throttle action. If the utilization of the mailbox is below the watermark, the controller throttles up ( increases the pressure) on the actor; If the utilization of the mail box exceeds 1.0 - watermark the controller decreases the pressure on the actor, otherwise the controller does not interfere with the data flow.
  • mailboxCapacity:Capacity of the mailboxes for the worker actors (maximum number of messages). It is assumed that all mailboxes have the same capacity

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Controller(
   numWorkers: Int, 
   watermark: Double, 
   capacity: Int
   )  extends Actor {
  
  var id: Int = 0
  var batchSize = capacity>>1

    // Create a set of worker actors
  val workers = List.tabulate(numWorkers)(n => 
    context.actorOf(Props(new Worker(n)), name = s"worker$n")
   )
    
  val pushTimeout = new FiniteDuration(10, MILLISECONDS)
  val msgQueues = workers.map(w => 
     (new BoundedMailbox(capacity, pushTimeout))
       .create(Some(w), Some(context.system))
   )
 
  val watcher = context.actorOf(Props(new Watcher(msgQueues)))
   ...
}


The set of workers are created using the tabulate higher order method. A message queue (mailbox) has to be created for each actor. The mailboxes are bounded in order to avoid buffer overflow. Finally a watch dog actor of type Watcher is created through the Akka context to monitor the mailboxes for the worker. The watcher actor is described in the next sub paragraph.
Let's look at the Controller message loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
override def receive = {
    // Loads chunk of stream or input data set
  case input: Load => load(input.strm)

    // processes results from workers
  case msg: Completed => 
    if(msg.id == -1) kill else process(msg)

    // Status on mail boxes utilization sent by the watchdog
  case status: Status => throttle(status.load)
}


The controller performs 3 distinct functions:
  • load: Load a variable number of data points and partition them for each worker
  • process: Aggregate the results of computation for all the workers
  • throttle: Increase or decrease the number of data points loaded at each iteration.
Let's look at these methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  // Load, partition input stream then 
  // distribute the partitions across the workers
def load(strm: InputStream): Unit =

  while( strm.hasNext ) {
    val nextBatch = strm.next(batchSize)
    partition(nextBatch)
       .zip(workers)
       .foreach(w => w._2 ! Compute(id, w._1, strm.fct) )
    id += 1
  }
 
  // Process, aggregate results from all the workers
def process(msg: Completed): Unit = {
   ..  // aggregation
  watcher ! GetStatus
}
  
  // Simple throttle function that increases or decreases the 
  // size of the next batch of data to be processed by 
  // workers according to the current load on mail boxes
def throttle(load: Double): Unit = {
  if(load < watermark) 
    batchSize += (batchSize*(load-watermark)).floor.toInt
  else if(load > 1.0 - watermark) 
    batchSize -= (batchSize*(load-1.0 + watermark)).floor.toInt
   
  if( batchSize > (mailboxCapacity>>1) )
    batchSize = (mailboxCapacity>>1)
}


load extracts the next batch of data, partitions it then send each partition to a worker actor along with the data transformation fct
process aggregates the result (Completed) from the transformation on each worker. Then the controller requests a status on the mail box to the watcher
throttle recompute the size of the next batch, batchSize using the load information provided by the watcher relative to the watermark.

Watcher
The watcher has a simple task: compute the average load of the mailbox of all workers. The computation is done upon reception of the GetStatus message from the controller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Watcher(
   queue: Iterable[MessageQueue]
  ) extends Actor {

  def load = queue.map( _.numberOfMessages)
                  .sum.toDouble/queue.size

  override def receive = { 
    case GetStatus => sender ! Status(load) 
  }
}

Memory consumption profile
The bottom graph displays the action of the controller (throttling). The Y-axis display the intensity of the throttle from -6 for significant decrease in load (size of batches) to +6 for significant increase in load/pressure on the workers. A throttle index of 0 means that no action is taken by the controller.
The top graph displays the average size of the worker's mailbox, in response of action taken by the controller.

Important notes
This implementation of a feedback controller loop is rather crude and mainly described as an introduction to the concept of back pressure control. Production quality implementation relies on:
  • TCP-back pressure using reactive streams
  • A more sophisticated throttle algorithm to avoid significant adjustment or resonance
  • Handling dead letters in case the throttle algorithm fails and the mailbox overflows

References