Wednesday, March 4, 2015

F-bounded type polymorphism

Overview
F-Bounded Type polymorphism or Bounded Quantification Polymorphism is parametric type polymorphism that constraints the subtypes to themselves using bounds.
Let's consider the following "classification" problem:

How can we guarantee that the SharpPencils bucket contains only sharp pencils, not small pencils or erasers?

Type Polymorphism
The first attempt to solve the problem is to rely on parametric type polymorphism. To this purpose, we create a Pencils trait sub-classed by as a bucket for sharp pencils, SharpPencils and a bucket for small pencils, SmallPencils.
For the sake of simplification, we assume that Pencils defines only to methods to add and pop pencils.

trait Pencils[T] {
  private lazy val content = new mutable.ListBuffer[T]
 
  def add(t: T): Unit = content.append(t)
  def pop: T = content -= head
}
 
class SharpPencils extends Pencils[SharpPencils]
class SmallPencils extends Pencils[SmallPencils]

This implementation does not guarantee that SharpPencils is the only bucket that contains the sharp pencils. Another bucket can be created with sharp pencils, too.

class OtherPencils extends Bucket[SharpPencils]

The solution is to specify constraints (or bounds) on the type of elements contained in each bucket.

Bounded Polymorphism
The goal is to make sure that the bucket of specific type (or containing a specific type of pencils). The first step is to make sure that a bucket does not contain other items than a Pencils

trait Pencils[T <: Pencils[T]] {
  private lazy val content = new mutable.ListBuffer[T]
 
  def add(t: T): Unit = content.append(t)
  def pop: T = content -= head
}

This implementation resolve the limitation raised in the previous paragraph. However there is nothing that prevent SharpPencils to contain small pencils and SmallPencils to contain sharp pencils, as illustrated in the next code snippet.

 // Won't compile!
class SharpPencils extends Pencils[Eraser]
 
 // The following code compiles!
class SharpPencils extends Pencils[SmallPencils]
class SmallPencils extends Pencils[SharpPencils]

Self-referential Polymorphism
As a matter of fact, the most effective constraint on a inherited type is the self reference that list the type allows for the method to execute.

trait Pencils[T <: Pencils[T]] {
  self: =>
    private lazy val content = new mutable.ListBuffer[T]
    def add(t: T): Unit = content.append(t)
    def pop: T = content -= head
}
 
   // The following code fails to compile!
class SharpPencils extends Pencils[SmallPencils]

The declaration of SharpPencils as containing small pencils fails to compile because it violates the self-referential type restriction.

Reference
Getting F-Bounded Polymorphism into Shape B. Greenman, F. Muehlboeck, R. Tate - Cornell University

Thursday, February 26, 2015

Function vectors using andThen

Overview

Space of functions are commonly used in machine learning (kernel functions) and differential geometry (tensors and manifolds).The most common technique to create a functions space is to define the space as a category and assign monadic operations to it.
This post discusses an alternative, less elaborate approach to define a functions space as a container. The key operator on function is the composition as defined in the Function1[-T, +R] class:
def andThen[A](g: (R) ⇒ A): (T) ⇒ A
def compose[A](g: (A) ⇒ T): (A) ⇒ R

Let's use andThen as the basic block to build our functions space.


andThen
The andThen is a member method of some of the Scala standard library collections such as List and HashMap.

val xsInt = List[Int](4, 0, 9, 56, 11)
xsInt.andThen((n: Int) => n*n).mkString(",")
   // print 16,0,81,3136,121

As mentioned in the introduction, the Function1 implements andThen to compose two functions as this o otherFunction where the o composition operator is defined as
f o g: (t: T) => f(g(t))
The method is implemented using the andThen method

val fg = Math.sin andThen (x: Double) => x*x
Console.println(s"fg(5.0): ${fg(5.0)}")

class FunctionVector[T](fVec: List[T=>T]) {
  def andThen(op: T => T): T=>T
  def dot(opVec: List[T=>T])(implicit num: Numeric[T]): T => T
  def map(op: T => T): List[T=>T]
   ...
}

The dot product is defined as the product of two vectors functions. For example in a function vector spaces of dimension 3, v(f,g,h) and v'(f',g',h')
dot: (x,y,z) -> f o f'(x,y,z) + g o g'(x,y,z) + h o h'(x,y,z)
The method andThen composes this function vector with a function op and generates a 'cumulative' composed function as f o g o h o op.
The iteration along the function vector is implemented as a tail recursion. The method relies on the List[T=>T].andThen method.

def andThen(g: T => T): T => T = {

  @scala.annotation.tailrec
  def andThen(xsf: List[T=>T])(op: T => T): T => T =
   if(xsf.size == 0) op 
   else andThen(xsf.drop(1))(xsf.head andThen op)
   
  andThen(fVec.reverse)(op)
}

The 'dot' product takes another function vector of same size as argument. The two vectors are zipped to generate a Iterable[T=>T] vector of composed function elements using a map. Finally the resulting dot function is computed by generating a new function with summation of the elements of the composed functions vector. The summation requires the instance of Numeric to be declared as an implicit argument.

def dot(gVec: List[T=>T])(implicit num: Numeric[T]): T => T = {  
  val composed = fVec.zip(opVec).map( fg => fg._1 andThen fg._2)
  t: T) => composed.map( _(t)).sum
}

Contrary to the andThen method, the map convert a function vector to another function vector by applying a natural transformation.

def map(op: T => T): List[T=>T] = fVec.map( _ andThen op)

Finally, the definition of the dot product can be extended to any aggregation method aggr

def dot(opVec: List[T=>T], aggr: (T =>T, T=>T) =>  (T=>T)): T => T = {
  val composed = fVec.zip(opVec).map( fg => fg._1 andThen fg._2)
 (t: T) => composed.foldLeft((t: T) => t)((h,f) =>aggr(h,f))(t)
}

The class FunctionVector is now ready for testing.

val functionsList = List[Function1[Double,Double]](
   Math.sin, Math.sqrt
)
   
val vec = new FunctionVector[Double](functionsList)
val output1 = vec.andThen((x: Double) => x*x)
val opVec = List[Double=>Double](
 (x: Double)=> x+ 1.0, 
 (x: Double)=> Math.exp(-x)
)
val output2 = vec.dot(opVec)


References
Scala By Example - M. Odersky - June 2014

Thursday, February 5, 2015

Akka mailbox back pressure

Overview

Akka is a very reliable and effective library to deploy tasks over multiple cores and over multiple hosts. Fault-tolerance is implemented through a hierarchy of supervising actors, not that different from the Zookeeper framework.
But what about controlling the message flows between actors or clusters of actors? How can we avoid messages backing up in mailboxes for slow, unavailable actors or lengthy computational tasks?
Typesafe has introduced Akka reactive streams and Flow materialization to control TCP back-pressure and manage data flows between actors and cluster of actors. TCP back pressure is a subject for a future post. In the meantime let's design the poor's man back-pressure handler using bounded mail boxes.


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Simple workflow with back pressure control
Let's look at a simple computational workflow with the following components:
  • Workers: These actors process and transform data sets. They start a computation task upon receiving a Compute message that contain the next data set to process. Each worker actor returns the results of the computation (transformed data set) back to the controller using the Completed message.
  • Controller is responsible for loading batch of data through a Load message and forward it to the workers in a Compute message. The Controller request the status of the bounded mail box for each worker by sending a GetStatus to the watcher.
  • Watcher monitor the state of the workers' mail box and return the current load (as percentage of the mail box is currently used) to the controller with a Status message.
The following diagram describe the minimum set of messages required to execute the workflow.


Workers
The worker actor is responsible for processing a slice of data using a function forwarded by the controller.


final class Worker(id: Int) extends Actor {
  override def receive = {
      // Sent by master/controller to initiate msg.fct computation
    case msg: Compute => {
      val output = msg.fct(msg.xt)
      sender ! Completed(id, output, msg.id+1)
    }
    case Cleanup => sender ! Done
  }
}


The worker receives the input to the computation (slice of time series msg.xt and the data transformation msg.fct through the Compute message sent by the controller. Note that the function fct cannot be defined as a closure as the context of the function is unknown to the worker.
The actor return the result output of the computation through the Completed message. Finally all the workers notify the controller that their tasks is completed by responding to the Cleanup message.


type DblSeries = List[Array[Double]]
 
sealed abstract class Message(val id: Int)

  // Sent by worker back to controller
case class Completed(i: Int, xt: DblSeries, nIter: Int) 
   extends Message(i)

  // Sent by controller to workers
case class Compute(i: Int, xt: DblSeries, fct: DblSeries => DblSeries) 
   extends Message(i)

case class Cleanup() extends Message(-1)


Controller
The worker actor is responsible for processing a slice of data using a function forwarded by the controller. It takes three parameters.

  • numWorkers: Number of worker actors to create
  • watermark: Define the utilization of the worker mail box which trigger a throttle action. If the utilization of the mailbox is below the watermark, the controller throttles up ( increases the pressure) on the actor; If the utilization of the mail box exceeds 1.0 - watermark the controller decreases the pressure on the actor, otherwise the controller does not interfere with the data flow.
  • mailboxCapacity:Capacity of the mailboxes for the worker actors (maximum number of messages). It is assumed that all mailboxes have the same capacity


class Controller(numWorkers: Int, watermark: Double, capacity: Int) 
    extends Actor {
  
  var id: Int = 0
  var batchSize = capacity>>1

    // Create a set of worker actors
  val workers = List.tabulate(numWorkers)(n => 
    context.actorOf(Props(new Worker(n)), name = s"worker$n"))
    
  val pushTimeout = new FiniteDuration(10, MILLISECONDS)
  val msgQueues = workers.map(w => 
     (new BoundedMailbox(capacity, pushTimeout)).create(Some(w), Some(context.system))
   )
 
  val watcher = context.actorOf(Props(new Watcher(msgQueues)))
   ...
}


The set of workers are created using the tabulate higher order method. A message queue (mailbox) has to be created for each actor. The mailboxes are bounded in order to avoid buffer overflow. Finally a watch dog actor of type Watcher is created through the Akka context to monitor the mailboxes for the worker. The watcher actor is described in the next sub paragraph.
Let's look at the Controller message loop.


override def receive = {
    // Loads chunk of stream or input data set
  case input: Load => load(input.strm)

    // processes results from workers
  case msg: Completed => if(msg.id == -1) kill else process(msg)

    // Status on mail boxes utilization sent by the watchdog
  case status: Status => throttle(status.load)
}


The controller performs 3 distinct functions:
  • load: Load a variable number of data points and partition them for each worker
  • process: Aggregate the results of computation for all the workers
  • throttle: Increase or decrease the number of data points loaded at each iteration.
Let's look at these methods.


  // Load, partition input stream then 
  // distribute the partitions across the workers
def load(strm: InputStream): Unit =
  while( strm.hasNext ) {
     val nextBatch = strm.next(batchSize)
     partition(nextBatch).zip(workers)
            .foreach(w => w._2 ! Compute(id, w._1, strm.fct) )
     id += 1
  }
 
  // Process, aggregate results from all the workers
def process(msg: Completed): Unit = {
   ..  // aggregation
  watcher ! GetStatus
}
  
  // Simple throttle function that increases or decreases the 
  // size of the next batch of data to be processed by 
  // workers according to the current load on mail boxes
def throttle(load: Double): Unit = {
  if(load < watermark) 
    batchSize += (batchSize*(load-watermark)).floor.toInt
  else if(load > 1.0 - watermark) 
    batchSize -= (batchSize*(load-1.0 + watermark)).floor.toInt
   
  if( batchSize > (mailboxCapacity>>1) )
    batchSize = (mailboxCapacity>>1)
}


load extracts the next batch of data, partitions it then send each partition to a worker actor along with the data transformation fct
process aggregates the result (Completed) from the transformation on each worker. Then the controller requests a status on the mail box to the watcher
throttle recompute the size of the next batch, batchSize using the load information provided by the watcher relative to the watermark.

Watcher
The watcher has a simple task: compute the average load of the mailbox of all workers. The computation is done upon reception of the GetStatus message from the controller.


class Watcher(queue: Iterable[MessageQueue]) extends Actor {
  def load = queue.map( _.numberOfMessages)
                  .sum.toDouble/queue.size
  override def receive = { 
    case GetStatus => sender ! Status(load) 
  }
}

Memory consumption profile
The bottom graph displays the action of the controller (throttling). The Y-axis display the intensity of the throttle from -6 for significant decrease in load (size of batches) to +6 for significant increase in load/pressure on the workers. A throttle index of 0 means that no action is taken by the controller.
The top graph displays the average size of the worker's mailbox, in response of action taken by the controller.

Important notes
This implementation of a feedback controller loop is rather crude and mainly described as an introduction to the concept of back pressure control. Production quality implementation relies on:
  • TCP-back pressure using reactive streams
  • A more sophisticated throttle algorithm to avoid significant adjustment or resonance
  • Handling dead letters in case the throttle algorithm fails and the mailbox overflows

References

Thursday, January 8, 2015

Implicit classes to extend libraries

Overview
Implicit methods are quite useful in defining global type conversion (as long as the semantic is clearly understood). But what about implicit classes?
Implicit classes can be used to extend existing Scala standard library classes. Most of Scala classes are declared final or implement a sealed trait. Composition is a viable alternative to Inheritance in term of re-usability: the class to extend is wrapped into a helper or utility class. However, a helper/container class adds an indirection and "pollute" the name space.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.
Let's look at an example of extension of standard library.

Example
The use case consists of extending the Try class, scala.util.Try with a Either semantic: Execute a function if the code throws an exception, and another function if the computation succeeds. Such simple design allows computation flows to be routed depending on unexpected state.

The main construct is the declaration of a implicit class that wraps Try. A recovery function rec is called if an exception is thrown, a computation f is performed otherwise.


import scala.util._
 
implicit class Try2Either[T](_try: Try[T]) {
    
  def toEither[U](rec: ()=>U)(f: T=>T): Either[U,T] = _try match {
    case Success(res) => Right(f(res))
    case Failure(e) => println(e.toString); Left(rec())  
  }
}


You may notice that the method toEither is curried to segregate the two parameterized type T and U. It also comply with the semantic of Either with Left element for error (and recovery) and Right element dedicated to the successful outcome.

Let's take the example of the normalization of a large array of floating point values by their sum. A small value will generate a rounding error during the normalization and an exception is thrown. However we do not want to burden the client code with handling the exception (the client method may not know the context of the exception after all). In this example the recovery function rec instantiates the class Recover which is responsible for a retry, potentially from a different state.


Implicit classes have an important limitation in terms of re-usability. You cannot override a default method without having to sub-class the original Scala standard library class, which is not an option in our example because Try is a sealed abstract class.
As with implicit methods, the name of the class is never actually used in the code but need to reflect the intention of the developer. Let's apply this implicit class

type DVector = Array[Double]

  // Arbitrary recovery class
class Recover {
  def reTry(x: DVector): DVector  = Array[Double](0.0)
}
  
  // Normalization of a vector. Proceeds to the
  // next computation (log) if the normalization succeeds
  // or invoke the recovery procedure otherwise
def normalize(x: DVector ): Either[Recover, DVector] = Try {
  val sum = x.sum 
  if(sum < 0.01) 
    throw new IllegalStateException(s"sum $sum")
  x.map( _ / sum) 
}
.toEither(() => new Recover)((v: DVector) => v.map( Math.log(_))

The implementation is very elegant. There is no need for new semantic and naming convention and the return type Either is very well understood in the Scala development community.

References

Sunday, December 14, 2014

Example of Scala Style Guide

Overview
There are many posts and articles dedicated to the style in Scala. The following style guide is heavily borrowed from Effective Scala - M. Eriksen-Twitter

Source code style and format
Editor
* Tab indentation: with 2 blank space/characters
* Margin: 100 characters
* Line wrap: 2 indentations

Scaladoc
The source comments complies with the Scaladoc tag annotation guideline

Organization of imports
Imports are defined in top of the source file and grouped in the following order [Scala standard library, 3rd party libraries, Scala for Machine Learning imports]

import scala.xxx       // Scala standard library
import org.xxx         // Third party libraries
import my_package.xxx  // Import related to your library

Collections
Some collection such as Set or Map are defined as mutable and immutable classes in Scala standard library. These classes are differentiated in the code by their package.

import scala.collection._
  ..
val myMap = new mutable.HashMap[T, U]
def process(values: immutable.Set[Double]) ...

Pipelining
Long pipelines of data transformation, operations and higher order method are written with one method per line

val lsp = builder.model(lrJacobian)
     .weight(createDiagonalMatrix(1.0)) 
     .target(labels)
     .checkerPair(exitCheck)
     .maxEvaluations(optimizer.maxEvals)
     .start(weights0)
     .maxIterations(optimizer.maxIters)
     .build

Constructors
Most of the class are declared as protected with package as scope. The constructors are defined in the class companion object using the apply method

protected class HMM[@specialized T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)(implicit f: DblVector => T) { }
 
object HMM {
  def apply[T <% Array[Int]](
      lambda: HMMLambda, 
      form: HMMForm, 
      maxIters: Int)(implicit f: DblVector => T): HMM[T] =
         new HMM[T](lambda, form, maxIters)

Lengthy parameters declaration
The length of the declaration of some constructors and methods exceeds 100 characters. In this case, class or method is written with one argument per line.

def apply[T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)
   (implicit f: DblVector => T): HMM[T]

Null references vs. options
Null objects are avoided as much as possible: Option should be used instead of null references

def normalized(stats: Option[Stats]): Option[List[Double]] =
  xt.map( stats.map( _.compute).getOrElse(None)

Null references and Empty collections
Null collections should be avoided. Empty container such as List.empty, Array.empty... should be used instead.

def test: List[T] = {
  ...
  List.empty[T]
}
if( !test.isEmpty)
  ...

Class parameter validation
The parameters of a class are validated in the companion object through a private method check

protected class LogisticRegression[T <% Double](
    xt: XTSeries[Array[T]], 
    labels: Array[Int], 
    optimizer: LogisticRegressionOptimizer) {
 
  import LogisticRegression._
  check(xt, labels)
  ...
}
 
object LogisticRegression {
  private def check[T <% Double](xt: Array[Array[T]], labels: Array[Int]): Unit = {
    require( !xt.isEmpty,"Cannot compute the logistic regression of undefined time series")
    require(xt.size == labels.size, 
      s"Size of input data ${xt.size} is different from size of labels ${labels.size}")
  }
  ...
}

Exceptions
Scala 2.1+ exception handling scala.util.{Try, Success, Failure} is preferred instead of Java typed exception handling. The failure type can be extracted by matching the type of exception, if necessary and critical to the understanding of the code.

Try(process(args)) match {
  case Success(results) => 
  case Failure(e) => e match {
     case e: MathRuntimeError =>
  ..
}
   // Computation Sqrt(log(x)
def transform(x: Double): Double =
  Try( Math.log(x)).map( Math.sqrt( _)).getOrElse(Double.NaN)

View bounds for built-in type
Parameterized type with view bounded to a primitive are usually preferred to parameterized type with context bound (upper or lower bound)

class MultiLinearRegression[T <% Double](xt: List[Array[T]], y: Array[Double])

Enumeration and case classes
As a general rule, enumeration is used only in the case the type has a single parameter convertible to an Int. Structures that require specific attributes are implemented as case classes

object YahooFinancials extends Enumeration {
   type YahooFinancials = Value
   val DATE, OPEN, HIGH, LOW, CLOSE, VOLUME = Value
}
 
def getData(entries: List[String], fin: YahooFinancials): String = fin match {
  case DATE => entries(YahooFinancials.DATE.id)
  case OPEN => entries(YahooFinancials.OPEN.id)
}

Case classes or object should be preferred to enumeration.

sealed abstract class YahooFinancials(val id: Int) 
case object DATE extends YahooFinancials(0)
case object OPEN extends YahooFinancials(1)
case object HIGH extends YahooFinancials(2)
  
def getData(entries: List[String], fin: YahooFinancials): String = fin match {
  case DATE => entries(DATE.id)
  case OPEN => entries(OPEN.id)
}

References
Effective Scala - M. Eriksen, Twitter 2012
Scala by Example - M. Odersky
Scala for Machine Learning - P. Nicolas - Packt Publishing 2014