Wednesday, April 15, 2015

Recursive Viterbi algorithm for hidden Markov model

A significant number of algorithms used in machine learning relies on dynamic programming and hidden Markov model (HMM) is no exception. Scala tail elimination represents an excellent alternative to the traditional iterative implementation of the 3 canonical forms of the HMM.

Introduction to HMM
Markov processes, and more specifically HMM, are commonly used in speech recognition, language translation, and text classification, document tagging, data compression and decoding.
A HMM algorithm uses 3 key components

  • A set of observations
  • A sequence of hidden states
  • A model that maximizes the joint probability of the observations and hidden states, known as the Lambda model
HMM are usually visualized as lattice of states with transition and observations.
There are 3 use cases (or canonical forms) of the HMM.
  • Evaluation: Evaluate the probability of a given sequence of observations, given a model
  • Training: Identify (or learn) a model given a set of observations
  • Decoding Estimate the state sequence with the highest probability to generate a given as set of observations and a model
The last use case, Decoding, is implemented numerically using the Viterbi algorithm.

Viterbi recursive implementation
Given a sequence of states {qt} and sequence of observations {oj}, the probability δt(i) for any sequence to have the highest probability path for the first T observations is defined for the state Si.
  • Delta: sequence to have the highest probability path for the first i observations is defined for a specific test δt(i)
  • Qstar: the optimum sequence q* of states Q0:T-1
The state of the Viterbi/HMM computation regarding the state transition and emission matrices is defined in the HMMState class

final protected class HMMState(val lambda: HMMLambda, val maxIters: Int) {
  // Matrix of elements (t, i) that defines the highest probability of 
  // a single path of t observations reaching state S(i)
  val delta = Matrix[Double](lambda.getT, lambda.getN)
  // Auxiliary matrix of indices that maximize the probability of a 
  //given sequence of states
  val psi = Matrix[Int](lambda.getT, lambda.getN)

  // Singleton to compute the sequence Q* of states with the highest 
  // probability given a sequence of observations.
  object QStar {
    private val qStar = Array.fill(lambda.getT)(0)

    // Update Q* the optimum sequence of state using backtracking.. 
    def update(t: Int, index: Int): Unit

The algorithm is conveniently illustrated by the following diagram.

First, Let's create the key member and method for the Lambda model for HMM. The model is defined as a tuple of the transition probability matrix A, emission probability matrix B and the initial probability π

final protected class HMMLambda(
  val A: Matrix[Double], 
  val B: Matrix[Double], 
  var pi: Array[Double], 
  val numObs: Int) {

       // Number of observations
   @inline def getT: Int = numObs
       // Number of states for a sequence of observations
   @inline def getN: Int = A.nRows
       // Number of unique symbols (problem dimension) u
   @inline def getM: Int = B.nCols

      // Compute a new estimate of the log of the conditional 
      // probabilities for a given iteration. 
   def estimate(state: HMMState, obs: Array[Int]): Unit

      // Normalize the state transition matrix A, emission matrix B 
      // and the initial probabilities pi. 
   def normalize: Unit 

Then let's define the basic components for implementing the Viterbi algorithm. The class ViterbiPath encapsulates the member variables critical to the recursion.
The Viterbi algorithm is fully defined with

  • lambda: Lambda model defined above
  • state: State of the computation
  • obsIndx: Index of observed states

class ViterbiPath(lambda: HMMLambda, state: HMMState, obsIndx: Array[Int]) {
  val maxDelta = recurse(lambda.config._T, 0)

The recursive method, recurse that implements the formula or steps defined earlier. The method relies on the tail recursion. Tail recursion or tail elimination algorithm avoid the creation of a new stack frame for each invocation, improving the performance of the entire recursion.
private def recurse(t: Int): Double = {
  // Initialization of the delta value, return -1.0 in case of error
  if( t == 0)

   // then for the subsequent observations ...
  else { 
    // Update the maximum delta value and its state index for the observation t
    Range(0, lambda.getN).foreach( updateMaxDelta(t, _) )
     // If we reached the last observation... exit by backtracing the
     // computation of the 
    if( t ==  obs.size-1) {
      val idxMaxDelta = Range(0, lambda.getN).map(i => 
               (i,, i))).maxBy(_._2)

 // Update the Q* value with the index that maximize the delta.A
      state.QStar.update(t+1, idxMaxDelta._1)

Once initialized (1), the maximum value of delta, maxDelta, is computed recursively by applying the formula at each state, s using the Scala maxBy method. (2). Next, the index of the column of the transition matrix A corresponding to the maximum of delta is computed (3). The last step is to update the matrix psi (4) (resp. delta (5)). Once the step t reaches the maximum number of observation labels (6), the optimum sequence of state q* is computed (7).

Scala for machine learning Packt Publishing 2014
Pattern Recognition and Machine Learning; Chap 13 Sequential Data/Hidden Markov Models C. Bishop Springer 2009

Sunday, March 29, 2015

Contravariant functors for vector fields

Most of Scala developers have some experience with the core tenets of functional programming: monads, functors and applicatives. Those concepts are not specific to Scala or even functional programming at large. There are elements of a field in Mathematics known as topology or algebraic topology.
Differential geometry or differential topology makes heavy use of tensors that leverage covariant and contravariant functors.
This post introduces the concepts of

  • Contravariant functors applied to co-vectors and differential forms
  • Projection of higher kind

Vector fields 101
Let's consider a 3 dimension Euclidean space with basis vector {ei} and a vector field V (f1, f2, f3) [Note: we follow Einstein tensor indices convention]

The vector field at the point P(x,y,z) as the tuple (f1(x,y,z), f2(x,y,z), f3(x,y,z)).
The vector over a field of k dimension field can be formally. mathematically defined as
\[f: \boldsymbol{x} \,\, \epsilon \,\,\, \mathbb{R}^{k} \mapsto \mathbb{R} \\ f(\mathbf{x})=\sum_{i=1}^{n}{f^{i}}(\mathbf{x}).\mathbf{e}^{i}\] Example: \[f(x,y,z) = 2x+z^{3}\boldsymbol{\mathbf{\overrightarrow{i}}} + xy+e^{-y}-z^{2}\boldsymbol{\mathbf{\overrightarrow{j}}} + \frac{x^{3}}{y}\boldsymbol{\mathbf{\overrightarrow{k}}}\]
Now, let's consider the same vector V with a second reference (origin O' and basis vector e'i

The transformation matrix Sij convert the coordinates value functions fi and f'i. The tuple f =(fi) or more accurately defined as (fi) is the co-vector field for the vector field V
\[S_{ij}: \begin{Vmatrix} f^{1} \\ f^{2} \\ f^{3} \end{Vmatrix} \mapsto \begin{Vmatrix} f'^{1} \\ f'^{2} \\ f'^{3} \end{Vmatrix}\]
The scalar product of the co-vector f' and vector v(f) defined as is defined as
\[< f',v> = \sum f'_{i}.f^{i}\]
Given the scalar product we can define the co-vector field f' as a linear map
\[\alpha (v) = < f',v> (1) \]
Covariant functors
I assume the reader has basic understanding of Functor and Monads. Here is short overview:
A category C is composed of object x and morphism f defined as
\[C= \{ {x_{i}, f \in C | f: x_{i} \mapsto x_{j}} \}\] A functor F is a map between two categories C and D that preserves the mapping.
\[x\in C \Rightarrow F(x)\in D \\ x, y\in C \,\,\, F: x \mapsto y => F(x)\mapsto F(y)\]
Let's look at the definition of a functor in Scala with the "preserving" mapping method, map

trait Functor[M[_]] {
  def map[U, V](m: M[U])(f: U => V): M[V]

Let's define the functor for a vector (or tensor) field. A vector field is defined as a sequence or list of fields (i.e. values or function values).

type VField[U] = List[U]

trait VField_Ftor extends Functor[VField] {
  override def map[U, V](vu: VField[U])(f: U => V): VField[V] = 

This particular implementation relies on the fact that List is a category with its own functor. The next step is to define the implicit class conversion VField[U] => Functor[VField[U]] so the map method is automatically invoked for each VField instance.

implicit class vField2Functor[U](vu: VField[U]) extends VField_Ftor {
    final def map[V](f: U => V): VField[V] = 

By default Covariant Functors (which preserve mapping) are known simply as Functors. Let's look at the case of Covector fields.

Contravariant functors
A Contravariant functor is a map between two categories that reverses the mapping of morphisms.
\[x, y\in C \,\,\, F: x \mapsto y => F(y)\mapsto F(x)\]

trait CoFunctor[M[_]] {
  def map[U, V](m: M[U])(f: V => U): M[V]

The map method of the Cofunctor implements the relation M[V->U] => M[U]->M[V]
Let's implement a co-vector field using a contravariant functor. The definition (1) describes a linear map between a vector V over a field X to the scalar product V*: V => T.
A morphism on the category V* consists of a morphism of V => T or V => _ where V is a vector field and T or _ is a scalar function value.

type CoField[V, T] = Function1[V, T]

The co-vector field type, CoField is parameterized on the vector field type V which is a input or function parameter. Therefore the functor has to be contravariant.
The higher kind type M[_] takes a single type as parameter (i.e. M[V]) but a co-vector field requires two types:
  • V: Vector field
  • T: The scalar function is that the result of the inner product <.>
Fortunately the contravariant functor CoField_Ftor associated with the co-vector needs to be parameterized only with the vector field V. The solution is to pre-defined (or 'fix') the scalar type T using a higher kind projector for the type L[V] => CoField[V, T]
T => ({type L[X] = CoField[X,T]})#L

trait CoField_Ftor[T] extends CoFunctor[({type L[X] = CoField[X,T]})#L ] {
  override def map[U,V](vu: CoField[U,T])(f: V => U): CoField[V,T] = 
       (v: V) => vu(f(v))

As you can see the morphism over the type V on the category CoField is defined as f: V => U instead of f: U => V. A kind parameterized on the return type (Function1) would require the 'default' (covariant) functor. Once again, we define an implicit class to convert a co-vector field, of type CoField to its functor, CoField2Ftor

implicit class CoField2Ftor[U,T](vu: CoField[U,T]) extends CoField_Ftor[T] {
   final def map[V](f: V => U): CoField[V,T] = 

Let's consider a field of function values FuncD of two dimension: v(x,y) = f1(x,y).i + f2(x,y.j. The Vector field VField is defined as a list of two function values.
type DVector = Array[Double]
type FuncD = Function1[DVector, Double]
type VFieldD = VField[FuncD]

The vector is computed by assigning a vector field to a specific point (P(1.5, 2.0). The functor is applied to the vector field, vField to generate a new vector field vField2

val f1: FuncD = new FuncD((x: DVector) => x(0)*x(1))
val f2: FuncD = new FuncD((x: DVector) => -2.0*x(1)*x(1))
val vfield: VFieldD = List[FuncD](f1, f2)
val value: DVector = Array[Double](1.5, 2.0)  
val vField2: VFieldD = _*(4.0))

A co-vector field, coField is computed as the sum of the fields (function values). The product of co-vector field and vector field (scalar field product) is simply computed by applying the co-vector (linear map) to the vector field. Once defined, the morphism _morphism is used to generate a new co-vector field through the contravariant function,

val coField: CoField[VFieldD, FuncD] = (vf: VFieldD) => vf(0) + vf(1)
val contraFtor: CoField2Functor[VFieldD, FuncD] = coField
val product = coField(vField)
val _morphism: VFieldD => VFieldD = (vf: VFieldD) => _*(3.0))
val coField2 = _morphism )
val newProduct: FuncD = coField2(coField)

  • Tensor Analysis on Manifolds - R. Bishop, S. Goldberg - Dover publication 1980
  • Differential Geometric Structures - W. Poor - Dover publications 2007
  • Functors and Natural Transformationsv- A. Tarlecki - Category Theory 2014

Wednesday, March 4, 2015

F-bounded type polymorphism

F-Bounded Type polymorphism or Bounded Quantification Polymorphism is parametric type polymorphism that constrains the subtypes to themselves using bounds.
Let's consider the following "classification" problem:

How can we guarantee that the SharpPencils bucket contains only sharp pencils, not small pencils or erasers?

Type Polymorphism
The first attempt to solve the problem is to rely on parametric type polymorphism. To this purpose, we create a Pencils trait sub-classed by as a bucket for sharp pencils, SharpPencils and a bucket for small pencils, SmallPencils.
For the sake of simplification, we assume that Pencils defines only 2 methods: add and pop pencils.

trait Pencils[T] {
  private lazy val content = new mutable.ListBuffer[T]
  def add(t: T): Unit = content.append(t)
  def pop: List[T] = (content -= data.head).toList
class SharpPencils extends Pencils[SharpPencils]
class SmallPencils extends Pencils[SmallPencils]

This implementation does not guarantee that SharpPencils is the only bucket that contains the sharp pencils. Another bucket can be created with sharp pencils, too.

class OtherPencils extends Bucket[SharpPencils]

The solution is to specify constraints (or bounds) on the type of elements contained in each bucket.

Bounded Polymorphism
The goal is to make sure that the bucket of specific type (or containing a specific type of pencils). The first step is to make sure that a bucket does not contain other items than a Pencils

trait Pencils[T <: Pencils[T]] {
  private lazy val content = new mutable.ListBuffer[T]
  def add(t: T): Unit = content.append(t)
  def pop: List[T] = (content -= data.head).toList

This implementation resolve the limitation raised in the previous paragraph. However there is nothing that prevent SharpPencils to contain small pencils and SmallPencils to contain sharp pencils, as illustrated in the next code snippet.

 // Won't compile!
class SharpPencils extends Pencils[Eraser]
 // The following code compiles!
class SharpPencils extends Pencils[SmallPencils]
class SmallPencils extends Pencils[SharpPencils]

Self-referential Polymorphism
As a matter of fact, the most effective constraint on a inherited type is the self reference that list the type allows for the method to execute.

trait Pencils[T <: Pencils[T]] {
  self: =>
    private lazy val content = new mutable.ListBuffer[T]
    def add(t: T): Unit = content.append(t)
    def pop: List[T] = (content -= data.head).toList
   // The following code fails to compile!
class SharpPencils extends Pencils[SmallPencils]

The declaration of SharpPencils as containing small pencils fails to compile because it violates the self-referential type restriction.

Getting F-Bounded Polymorphism into Shape B. Greenman, F. Muehlboeck, R. Tate - Cornell University

Thursday, February 26, 2015

Function vectors using andThen


Space of functions are commonly used in machine learning (kernel functions) and differential geometry (tensors and manifolds).The most common technique to create a functions space is to define the space as a category and assign monadic operations to it.
This post discusses an alternative, less elaborate approach to define a functions space as a container. The key operator on function is the composition as defined in the Function1[-T, +R] class:
def andThen[A](g: (R) ⇒ A): (T) ⇒ A
def compose[A](g: (A) ⇒ T): (A) ⇒ R

Let's use andThen as the basic block to build our functions space.

The andThen is a member method of some of the Scala standard library collections such as List and HashMap.

val xsInt = List[Int](4, 0, 9, 56, 11)
xsInt.andThen((n: Int) => n*n).mkString(",")
   // print 16,0,81,3136,121

As mentioned in the introduction, the Function1 implements andThen to compose two functions as this o otherFunction where the o composition operator is defined as
f o g: (t: T) => f(g(t))
The method is implemented using the andThen method

val fg = Math.sin andThen (x: Double) => x*x
Console.println(s"fg(5.0): ${fg(5.0)}")

class FunctionVector[T](fVec: List[T=>T]) {
  def andThen(op: T => T): T=>T
  def dot(opVec: List[T=>T])(implicit num: Numeric[T]): T => T
  def map(op: T => T): List[T=>T]

The dot product is defined as the product of two vectors functions. For example in a function vector spaces of dimension 3, v(f,g,h) and v'(f',g',h')
dot: (x,y,z) -> f o f'(x,y,z) + g o g'(x,y,z) + h o h'(x,y,z)
The method andThen composes this function vector with a function op and generates a 'cumulative' composed function as f o g o h o op.
The iteration along the function vector is implemented as a tail recursion. The method relies on the List[T=>T].andThen method.

def andThen(g: T => T): T => T = {

  def andThen(xsf: List[T=>T])(op: T => T): T => T =
   if(xsf.size == 0) op 
   else andThen(xsf.drop(1))(xsf.head andThen op)

The 'dot' product takes another function vector of same size as argument. The two vectors are zipped to generate a Iterable[T=>T] vector of composed function elements using a map. Finally the resulting dot function is computed by generating a new function with summation of the elements of the composed functions vector. The summation requires the instance of Numeric to be declared as an implicit argument.

def dot(gVec: List[T=>T])(implicit num: Numeric[T]): T => T = {  
  val composed = fg => fg._1 andThen fg._2)
  t: T) => _(t)).sum

Contrary to the andThen method, the map convert a function vector to another function vector by applying a natural transformation.

def map(op: T => T): List[T=>T] = _ andThen op)

Finally, the definition of the dot product can be extended to any aggregation method aggr

def dot(opVec: List[T=>T], aggr: (T =>T, T=>T) =>  (T=>T)): T => T = {
  val composed = fg => fg._1 andThen fg._2)
 (t: T) => composed.foldLeft((t: T) => t)((h,f) =>aggr(h,f))(t)

The class FunctionVector is now ready for testing.

val functionsList = List[Function1[Double,Double]](
   Math.sin, Math.sqrt
val vec = new FunctionVector[Double](functionsList)
val output1 = vec.andThen((x: Double) => x*x)
val opVec = List[Double=>Double](
 (x: Double)=> x+ 1.0, 
 (x: Double)=> Math.exp(-x)
val output2 =

Scala By Example - M. Odersky - June 2014

Thursday, February 5, 2015

Akka mailbox back pressure


Akka is a very reliable and effective library to deploy tasks over multiple cores and over multiple hosts. Fault-tolerance is implemented through a hierarchy of supervising actors, not that different from the Zookeeper framework.
But what about controlling the message flows between actors or clusters of actors? How can we avoid messages backing up in mailboxes for slow, unavailable actors or lengthy computational tasks?
Typesafe has introduced Akka reactive streams and Flow materialization to control TCP back-pressure and manage data flows between actors and cluster of actors. TCP back pressure is a subject for a future post. In the meantime let's design the poor's man back-pressure handler using bounded mail boxes.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Simple workflow with back pressure control
Let's look at a simple computational workflow with the following components:
  • Workers: These actors process and transform data sets. They start a computation task upon receiving a Compute message that contain the next data set to process. Each worker actor returns the results of the computation (transformed data set) back to the controller using the Completed message.
  • Controller is responsible for loading batch of data through a Load message and forward it to the workers in a Compute message. The Controller request the status of the bounded mail box for each worker by sending a GetStatus to the watcher.
  • Watcher monitor the state of the workers' mail box and return the current load (as percentage of the mail box is currently used) to the controller with a Status message.
The following diagram describe the minimum set of messages required to execute the workflow.

The worker actor is responsible for processing a slice of data using a function forwarded by the controller.

final class Worker(id: Int) extends Actor {
  override def receive = {
      // Sent by master/controller to initiate msg.fct computation
    case msg: Compute => {
      val output = msg.fct(msg.xt)
      sender ! Completed(id, output,
    case Cleanup => sender ! Done

The worker receives the input to the computation (slice of time series msg.xt and the data transformation msg.fct through the Compute message sent by the controller. Note that the function fct cannot be defined as a closure as the context of the function is unknown to the worker.
The actor return the result output of the computation through the Completed message. Finally all the workers notify the controller that their tasks is completed by responding to the Cleanup message.

type DblSeries = List[Array[Double]]
sealed abstract class Message(val id: Int)

  // Sent by worker back to controller
case class Completed(i: Int, xt: DblSeries, nIter: Int) 
   extends Message(i)

  // Sent by controller to workers
case class Compute(i: Int, xt: DblSeries, fct: DblSeries => DblSeries) 
   extends Message(i)

case class Cleanup() extends Message(-1)

The worker actor is responsible for processing a slice of data using a function forwarded by the controller. It takes three parameters.

  • numWorkers: Number of worker actors to create
  • watermark: Define the utilization of the worker mail box which trigger a throttle action. If the utilization of the mailbox is below the watermark, the controller throttles up ( increases the pressure) on the actor; If the utilization of the mail box exceeds 1.0 - watermark the controller decreases the pressure on the actor, otherwise the controller does not interfere with the data flow.
  • mailboxCapacity:Capacity of the mailboxes for the worker actors (maximum number of messages). It is assumed that all mailboxes have the same capacity

class Controller(numWorkers: Int, watermark: Double, capacity: Int) 
    extends Actor {
  var id: Int = 0
  var batchSize = capacity>>1

    // Create a set of worker actors
  val workers = List.tabulate(numWorkers)(n => 
    context.actorOf(Props(new Worker(n)), name = s"worker$n"))
  val pushTimeout = new FiniteDuration(10, MILLISECONDS)
  val msgQueues = => 
     (new BoundedMailbox(capacity, pushTimeout)).create(Some(w), Some(context.system))
  val watcher = context.actorOf(Props(new Watcher(msgQueues)))

The set of workers are created using the tabulate higher order method. A message queue (mailbox) has to be created for each actor. The mailboxes are bounded in order to avoid buffer overflow. Finally a watch dog actor of type Watcher is created through the Akka context to monitor the mailboxes for the worker. The watcher actor is described in the next sub paragraph.
Let's look at the Controller message loop.

override def receive = {
    // Loads chunk of stream or input data set
  case input: Load => load(input.strm)

    // processes results from workers
  case msg: Completed => if( == -1) kill else process(msg)

    // Status on mail boxes utilization sent by the watchdog
  case status: Status => throttle(status.load)

The controller performs 3 distinct functions:
  • load: Load a variable number of data points and partition them for each worker
  • process: Aggregate the results of computation for all the workers
  • throttle: Increase or decrease the number of data points loaded at each iteration.
Let's look at these methods.

  // Load, partition input stream then 
  // distribute the partitions across the workers
def load(strm: InputStream): Unit =
  while( strm.hasNext ) {
     val nextBatch =
            .foreach(w => w._2 ! Compute(id, w._1, strm.fct) )
     id += 1
  // Process, aggregate results from all the workers
def process(msg: Completed): Unit = {
   ..  // aggregation
  watcher ! GetStatus
  // Simple throttle function that increases or decreases the 
  // size of the next batch of data to be processed by 
  // workers according to the current load on mail boxes
def throttle(load: Double): Unit = {
  if(load < watermark) 
    batchSize += (batchSize*(load-watermark)).floor.toInt
  else if(load > 1.0 - watermark) 
    batchSize -= (batchSize*(load-1.0 + watermark)).floor.toInt
  if( batchSize > (mailboxCapacity>>1) )
    batchSize = (mailboxCapacity>>1)

load extracts the next batch of data, partitions it then send each partition to a worker actor along with the data transformation fct
process aggregates the result (Completed) from the transformation on each worker. Then the controller requests a status on the mail box to the watcher
throttle recompute the size of the next batch, batchSize using the load information provided by the watcher relative to the watermark.

The watcher has a simple task: compute the average load of the mailbox of all workers. The computation is done upon reception of the GetStatus message from the controller.

class Watcher(queue: Iterable[MessageQueue]) extends Actor {
  def load = _.numberOfMessages)
  override def receive = { 
    case GetStatus => sender ! Status(load) 

Memory consumption profile
The bottom graph displays the action of the controller (throttling). The Y-axis display the intensity of the throttle from -6 for significant decrease in load (size of batches) to +6 for significant increase in load/pressure on the workers. A throttle index of 0 means that no action is taken by the controller.
The top graph displays the average size of the worker's mailbox, in response of action taken by the controller.

Important notes
This implementation of a feedback controller loop is rather crude and mainly described as an introduction to the concept of back pressure control. Production quality implementation relies on:
  • TCP-back pressure using reactive streams
  • A more sophisticated throttle algorithm to avoid significant adjustment or resonance
  • Handling dead letters in case the throttle algorithm fails and the mailbox overflows