Thursday, February 4, 2016

Newton-Raphson revisited

The Newton-Raphson is a well-know technique to compute the root of a function, f(x) = 0 or the minimum/maximum of a function using its derivative f'(x) = 0.
The simplicity of the Newton-Raphson method, in term of concept as well as implementation makes it a very popular solution. We will look into two different implementation in Scala and conclude the post by evaluation the relative benefits and limitations of the Newton-Raphson.

First implementation
Let's dive into the computation of the root x of a function f. A function is defined by its Taylor series of derivatives as follow:
\[f(s)=\sum_{0}^{\infty }\frac{f^{(n)}(x)}{n!}(s-x)^{n}\] In case of the root f(s), the equation becomes
\[0=f(x)+f'(x)(x-s)+O((x-s)^{2}f"(s))\] resulting to
\[x_{n+1}=x_{n}- \frac{f(x_{n})}{f'(x_{n})}\]

final class NewtonRaphson(
  f:   Double => Double,
  df:  Double => Double,
  dff: Option[Double => Double] = None
) {
  final val EPS = 0.01

  def root(z: Double): Double = => {
    val nextX = x - f(x) / df(x)
    if (Math.abs(nextX - x) < EPS) nextX else root(nextX)

The construtor takes 3 arguments:
  • Function which root is to be computed
  • First order derivative
  • Optional second order derivative that is not used in this first implementation
let's apply the root method to a concrete example:

val f = (x: Double) => Math.pow(x, 5.0) - 2.0 * x
val ff = (x: Double) => 5.0*Math.pow(x, 4.0) - 2.0
val nr1 = new NewtonRaphson(f, ff)
val z1 = nr1.root(7.9)

So far so good. However it is assumed that the function has a single root, or in the case of a maximum/minimum, its derivative has single root, at least in the vicinity of the initial data point. What about computing the root of a function which may have no or several root?

val g = (x: Double) => Math.exp(0.5 * x)
val gg = (x: Double) => 0.5 * g(x)

val nr3 = new NewtonRaphson(g, gg)

The Newton-Raphson will diverge and, in this case, generate a Double.NaN indefinitely. There are several option to guarantee that the method will properly exit in the case no root exists. Let's look at one solution that relies on the second derivative.

Second derivative to the rescue
We need to go back to basics: in its simplest form, the Newton-Raphson does not take into account the derivative of order higher than 1. The second order derivative, f" can be used as rough approximation of the error on the approximation of the root. The error san be evaluated at each iteration against a convergence criteria EPS.
\[\Delta \varepsilon = \varepsilon _{n+1}- \varepsilon _{n} \sim \frac{f'(x_{n})}{f"(x_{n})} < EPS\] \[x_{n+1}=x_{n}-\frac{f(x_{n})}{f'(x_{n})}(1 + \Delta \varepsilon)\] The approximation of the error is also used to validate whether the algorithm converges toward a solution (root) or starts to diverge. Let's implement this variation of the Newton-Raphson using the second order derivative as a convergence criteria.

 def root(z: Double): Double = => {

    def nr2ndOrder(xe: (Double, Double)): (Double, Double) = {
      val x = xe._1
      val eps = xe._2
      val nextEps = Math.abs(df(x) / _dff(x))
      val nextX = (x - f(x) *(1.0 + nextEps)/ df(x))

      // rules to exit recursion
      if (eps > 0.0 && nextEps >= eps) (-1.0, -1.0)
      else if (Math.abs(nextEps - eps) < EPS) (nextX, nextEps)
      else nr2ndOrder((nextX, nextEps))
    nr2ndOrder((z, 0.0))._1

The convergence criteria is implemented as follow:
  • if error, eps increases, exit the recursion => Diverge
  • if eps decreases with a value within the convergence criteria => Converged
  • if eps decreases with a value higher than the convergence criteria => Recurse

Let's compare the convergence of the simplest Newton-Raphson method with the convergence its variant using the 2nd order derivative, using the function f defined in the first paragraph.

Clearly, the adaptive step size using the second order derivative speeds up the convergence toward the root of the function f. The selection of the appropriate method comes down to the trade-off between the overhead of the computation of the second derivative and the larger number of iterations or recursions required to find the root within a predefined convergence criteria.

Tuesday, January 5, 2016

Simple class versioning in Scala

Versioning is a common challenge in commercial software development. The most common technique to support multiple versions of a library, executable or framework, in Java or Scala relies on the class loader.
A library can be easily versioned by creating multiple jar files, one for each version.

Let's select a simple case for each a library is self-contained in a jar file with two versions
  • ArrayIndex1.jar
  • ArrayIndex2.jar

import{URL, URLClassLoader}
import java.lang.reflect.Method

val cl1 = new URLClassLoader( 
   Array[URL](new File("ArrayIndex1.jar").toURL),   
val cl2 = new URLClassLoader( 
   Array[URL](new File("ArrayIndex2.jar").toURL),  
val compatibleClass: Class[_] = if( version > 0.9) cl1.loadClass("ArrayIndex") else cl2.loadClass("ArrayIndex")

val obj = compatibleClass.newInstance
val methods: Array[Method] = compatibleClass.getMethods
println(s"methods ${ _.getName).mkString(",")}")

The first step consists of loading the two versions of the library through their URL by converting each jar file names to a URL. In this particular case, the class loader is undefined by its URL with the type URLClassLoader
The class to be used in this case depends on the variable version. Once the appropriate class ia loaded, its instance and method are ready available to the client be be invoked.

Wednesday, December 2, 2015

Kullback-Leibler Divergence on Apache Spark

The Kullback-Liebler divergence also known as the relative entropy, measures the dissimilarity of the distribution of random values (i.e. probability density function). Let's consider two distribution P and Q with probability density functions p and q
\[KL(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{p(x)}{q(x)}\]
In the case of large data sets, the computation of the Kullback-Liebler divergence would benefit from a distributed environment such as Apache Spark. In this post, we will compare multiple data sets generated from a variety of continuous probability density functions against the ubiquitous normal distribution.

Implementation of Kullback-Liebler
First let's implement the Kullback-Leibler formula for measuring the dissimilarity between an existing data sets {xi, yi>} (presumably, generated by a specific random variable / distribution) and a given continuous distribution with a probability density function f

object KullbackLiebler {
  final val EPS = 1e-10
  type DATASET = Iterator[(Double, Double)]

  def execute( xy: DATASET, f: Double => Double): Double = {
    val z = xy.filter{ case(x, y) => abs(y) > EPS}
    - z./:(0.0){ case(s, (x, y)) => s + y*log(f(x)/y)}

  def execute( xy: DATASET, fs: Iterable[Double=>Double]): Iterable[Double] =, _))

The second execute static method compute the KL divergence value for a sequence of distribution defined by its list fs of probability density functions.
Next let's define some of the continuous random distributions to be evaluated against the normal distribution

def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)

val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)
val uniform = (x: Double) => x
val logNormal = (x: Double) => { 
  val lx = log(x)

val gamma = (x: Double, n: Int) => exp(-x)*pow(x, n)/fact(n)

val logGamma = (x: Double, alpha: Int, beta: Int) =>
  exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1))

val _beta = (n: Int, m: Int) => {
  val f = if(n < 2) 1 else fact(n-1)
  val g = if(m < 2) 1 else fact(m-1)
  f*g/fact(n+m -1).toDouble
val beta = (x: Double, alpha: Int, beta: Int) =>
  pow(x, alpha-1)*pow(x, beta-1)/_beta(alpha, beta)

val chiSquare = (x: Double, k: Int) => {
  val k_2 = k >>1
  pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density function are implemented as Scala partially applied functions with predefined parameter values. For example...

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5)
val chiSquare4 = chiSquare(_: Double, 4)

Spark to the rescue
The executing the Kullback-Leibler formula for a large data set with a significant number of random distributions may be a daunting computation task. A solution is to use Apache Spark framework to parallelize the computation of the divergence. Here are the steps:
  • Partition the original data set
  • Broadcast the first sequence of probability density functions
  • Execute the Kullback-Leibler formula on each partition using mapPartitions
  • Collect then aggregate the divergence factor from each partition

final val NUM_DATA_POINTS = 10000000
val numTasks: Int = 128
val conf = new SparkConf().setAppName("Kullback-Liebler").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)

Let's simulate a data set using the normal distribution. The resulting resilient distributed dataset is cached.

val master_rdd = generateData(gauss, NUM_DATA_POINTS, sc)

Next we broadcast a batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 2 -> logNormal, 3 -> gamma1, 4 -> gamma2, 5 -> gamma4, 
  6 -> logGamma12, 7 -> logGamma22, 8 -> beta22, 9 -> beta25, 
  10 -> chiSquare2, 11 -> chiSquare4

val pdfs_broadcast = sc.broadcast[Iterable[Int]](

Finally, let's implement the mapPartitions transformation. The method mapPartitions transform an array of values pair {x, y} into an array of size pdfs.size containing the KL divergence value related to each distribution to be evaluated.

val kl_rdd: RDD[Double] = master_rdd.mapPartitions((it:DATASET) => {
  val pdfsList = => pdfs.get(n).get)
  execute(it, pdfsList).iterator
} )

Finally, the divergence values from each partitions is collected then aggregated using the Scala fold operator :/.

val kl_master = kl_rdd.collect
val divergences = (0  until kl_master.size by pdfs.size)
  ./:(Array.fill(pdfs.size)(0.0))( (s, n) => {
     (0 until pdfs.size).foreach(j => s.update(j, kl_master(n+j)))
}).map( _ / kl_master.length)

Note: This particular implementation relies on the mapPartitions method to execute the KL divergence over each partition then execute the aggregation on the driver. A more common approach would be use the method RDD.aggregate

val pdfsList = pdfs.values.toList
val values = master_rdd.aggregate( Array.fill(pdfs.size)(0.0))((s, xy) =>{ case (_s, n) =>
      _s + xy._2*log( pdfsList(n)(xy._1)/xy._2 )
  (xt: Array[Double], yt: Array[Double]) =>
     .map(z => z._1 + z._2))

Test results
The results for the test evaluation of the dissimilarity between the data sets and a set of given probability distribution are displayed as follows:

Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006
Apache Spark documentation

Friday, November 20, 2015

Recursive Minimum Spanning Tree in Scala

Finding the optimum arrangement to connect nodes is a common problem in Network design, transportation projects or electrical wiring. Each connectivity is usually defined as a weight (cost, length, time...). The purpose is to compute the schema that connects all the nodes that minimize the total weight. This problem is known as the minimum spanning tree or MST related to the nodes connected through an un-directed graph.
Several algorithms have been developed over the last 70 years to extract the MST from a graph. This post focuses on the implementation of the Prim algorithm in Scala.

Prim's algorithm
There are many excellent tutorial on graph algorithm and more specifically on the Prim's algorithm. I recommend Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
Let's PQ is a priority queue, a Graph G(V, E) with n vertices V and E edges w(u,v). A Vertex v is defined by
  • An identifier
  • A load factor, load(v)
  • A parent tree(v)
  • The adjacent vertices adj(v)
The Prim's algorithm can be easily expressed as a simple iterative process. It consists of using a priority queue of all the vertices in the graph and update their load to select the next node in the spanning tree. Each node are popped up (and removed) from the priority queue before being inserted in the tree.
PQ <- V(G)
foreach u in PQ
  load(u) <- INFINITY
while PQ nonEmpty
  do u <- v in adj(u)
    if v in PQ && load(v) < w(u,v)
      tree(v) <- u
      load(v) <- w(u,v)
The Scala implementation relies on a tail recursion to transfer vertices from the priority queue to the spanning tree

Scala implementation: graph definition
The first step is to define a graph structure with edges and vertices. The graph takes two arguments:
  • numVertices number of vertices
  • start index of the root of the minimum spanning tree
The vertex class has three attributes
  • id identifier (arbitrary an integer)
  • load dynamic load (or key) on the vertex
  • tree reference to the minimum spanning tree
final class Graph(numVertices: Int, start: Int = 0) {
  class Vertex(val id: Int, 
     var load: Int = Int.MaxValue, 
     var tree: Int = -1) 

  val vertices = List.tabulate(numVertices)(n => new Vertex(n))
  vertices.head.load = 0
  val edges = new HashMap[Vertex, HashMap[Vertex, Int]]

  def += (from: Int, to: Int, weight: Int): Unit = {
    val fromV = vertices(from)
    val toV = vertices(to)
    connect(fromV, toV, weight)
    connect(toV, fromV, weight)
  def connect(from: Vertex, to: Vertex, weight: Int): Unit = {
    if( !edges.contains(from))
      edges.put(from, new HashMap[Vertex, Int])    
    edges.get(from).get.put(to, weight)

The vertices are initialized by with a unique identifier id, and a default load Int.MaxValue. In most case, the identifier is a characters string or a data structure. As described in the pseudo-code, the load for the root of the spanning tree is defined a 0.
The load is defined as an integer for performance's sake. It is recommended to convert (quantization) a floating point value to an integer for the processing of very large graph, then convert back to a original format on the resulting minimum spanning tree.
The edges are defined as hash table with the source vertex as key and the hash table of destination vertex and edge weight as value.
The graph is un-directed therefore the connection initialized in the method += are bi-directional.

Scala implementation: priority queue
The priority queue is used to reordered the vertices and select the next vertex to be added to the spanning tree.
Note: There are many different implementation of priority queues in Scala and Java. You need to keep in mind that the Prim's algorithm requires the queue to be reordered after its load is updated (see pseudo-code). The PriorityQueue classes in the Scala and Java libraries do not allow elements to be removed or to be explicitly re-ordered. An alternative is to used a binary tree, red-black tree for which elements can be removed and the tree re-balanced.
The implementation of the priority has a impact on the time complexity of the algorithm. The following implementation of the priority queue is provided only to illustrate the Prim's algorithm.
class PQueue(vertices: List[Vertex]) {
   var queue = vertices./:(new PriorityQueue[Vertex])((pq, v) => pq += v)
   def += (vertex: Vertex): Unit = queue += vertex
   def pop: Vertex = queue.dequeue
   def sort: Unit = {}
   def push(vertex: Vertex): Unit = queue.enqueue(vertex)
   def nonEmpty: Boolean = queue.nonEmpty
type MST = ListBuffer[Int]
implicit def orderingByLoad[T <: Vertex]: Ordering[T] = - _.load)  

The Scala PriorityQueue class required the implicit ordering of vertices using their load. This accomplished by defining an implicit conversion of a type T with upper-bound type Vertex to Ordering[T]
Note: The type T has to be a sub-class of Vertex. A direct conversion from Vertex type to Ordering[Vertex] is not allowed in Scala.

Scala implementation: Prim
This implementation is the direct translation of the pseudo-code presented in the second paragraph. It relies on the efficient Scala tail recursion.
def prim: List[Int] = {
  val queue = new PQueue(vertices)
  def prim(parents: MST): Unit = {
    if( queue.nonEmpty ) {
      val head = queue.pop
      val candidates = edges.get(head).get
            case(vt,w) => vt.tree == -1 && w <= vt.load
      if( candidates.nonEmpty ) {
        candidates.foreach {case (vt, w) => vt.load = w }
      head.tree = 1
  val parents = new MST

Time complexity
Minimum spanning tree with linear queue: V2
Minimum spanning tree with binary heap: (E + V).LogV
Minimum spanning tree with Fibonacci heap: V.LogV
Note: See Summary of time complexity of algorithms for details.

  • Introduction to Algorithms Chapter 24 Minimum Spanning Trees - T. Cormen, C. Leiserson, R. Rivest - MIT Press 1989
  • Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
  • Graph Theory Chapter 4 Optimization Involving Tree - V.K. Balakrishnan - Schaum's Outlines Series, McGraw Hill, 1997

Monday, November 9, 2015

Time Complexity: Graph & Machine Learning Algorithms

Time complexity (or worst case scenario for the duration of execution given a number of elements) is commonly used in computing science. However, you will be hard pressed to find a comparison of machine learning algorithms using their asymptotic execution time.

The following summary list the time complexity of some of the most common algorithms used in machine learning including, recursive computation for dynamic programming, linear algebra, optimization and discriminative supervised training.

AlgorithmTime complexityDescription
Recursion with one element reductionN2N: Number of elements
Recursion with halving reductionLogN
Recursion with halving reduction
and processing
Nearest neighbors searchM.logk.N.LogNM: number of features
k: number of neighbors
N: number of observations
Matrix multiplication (m,n)x(n,d),n,d matrices dimension
Matrix multiplication (n,n)n3n matrix dimension
Matrix multiplication (n,n)
n2.8n matrix dimension
Partial eigenvalues extraction (n,n)e.N2e: number of eigenvalues
N: number of observations
Complete eigenvalues extraction (n,n)N3N: number of observations
Minimum spanning tree
Prim linear queue
V2V: number vertices
Minimum spanning tree
Prim binary heap
(E + V).LogVE: number of edges
V: number vertices
Minimum spanning tree
Prim Fibonacci heap
V.LogVE: number of edges
V: number vertices
Shortest paths Graph
Dijkstra linear sorting
V2V: number of vertices
Shortest paths Graph
Dijkstra binary heap
(E + V).logVV: number of vertices
Shortest paths Graph
Dijkstra Fibonacci heap
V.logE: number of edges
V: number of vertices
Shortest paths kNN
Graph - Dijkstra
(k + logN).N2k: number of neighbors
N: number of observations
Shortest paths kNN
Graph - Floyd-Warshall
N3N: number of observations
Fast Fourier transformN.LogNN: Number of observations
Batched gradient descentN.P.IN: Number of observations
P: number of parameters
I: number of iterations
Stochastic gradient descentN.P.IN: number of observations
P: Number of variables
I: number of epochs
Newton with Hessian computationN3.IN: number of observations
I: number iterations
Conjugate gradientN.m.sqrt(k)N: number of observations
m: number of non-zero
k condition of the matrix
L-BFGSN.M.IN: number of observations
M: estimated number of grads
I: number of iterations
K-means (*)C.N.M.IC: Number of clusters
M: Dimension
N: number observations
I: number of iterations
Lasso regularization - LARS(*)N.M.min(N,M)M: Dimension
N: number observations
Hidden Markov model
Forward-backward pass
N2.MN: number of states
M: number of observations
Multilayer Perceptron (*)n.M.P.N.en: input variables
M: number hidden neurons
P: number output values
N: number of observations
e: Number of epochs
Support vector machine (*)
using Newton
N3N: number of observations
Support vector machine (*)
using Cholesky
N2N: number of observations
Support vector machine (*)
Sequential minimal optimization
N2N: number of observations
Principal Components Analysis (*)M2N + N3N: Number of observations
M: number of features
Expectation-Maximization (*)M2NN: Number of observations
M: number of variables
Laplacian EigenmapsM.logk.N.logN + m.N.k2 + d.N2N: Number of observations
M: number of variables
Genetic algorithmsP.logP.I.CC: number of genes/chromosome
P: population size
I: Number of iterations
(*): Training

Introduction to Algorithms T. Cormen, C. Leiserson, R. Rivest - MIT Press 1990
Machine Learning: A probabilistic Perspective K. Murphy - MIT Press, 2012
Big O cheat sheet

Wednesday, October 28, 2015

Recursive Mean & Standard Deviation in Scala

The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow

val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt(( _ - mean).map(t => t*t).sum)/x.length)

Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\]
Let's implement these two recursive formula in Scala using the tail recursion

def meanStd(x: Array[Double]): (Double, Double) ={
  def meanStd(x: Array[Double], mu: Double, Q: Double, count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
  meanStd(x, 0.0, 0.0, 0)

This implementation update the mean and the standard deviation for each new data point simultaneously.