## Tuesday, January 5, 2016

### Simple class versioning in Scala

Overview
Versioning is a common challenge in commercial software development. The most common technique to support multiple versions of a library, executable or framework, in Java or Scala relies on the class loader.
A library can be easily versioned by creating multiple jar files, one for each version.

Overview
Let's select a simple case for each a library is self-contained in a jar file with two versions
• ArrayIndex1.jar
• ArrayIndex2.jar

import java.net.{URL, URLClassLoader}
import java.io.File
import java.lang.reflect.Method

Array[URL](new File("ArrayIndex1.jar").toURL),
Array[URL](new File("ArrayIndex2.jar").toURL),

val obj = compatibleClass.newInstance
val methods: Array[Method] = compatibleClass.getMethods
println(s"methods ${methods.map( _.getName).mkString(",")}")  The first step consists of loading the two versions of the library through their URL by converting each jar file names to a URL. In this particular case, the class loader is undefined by its URL with the type URLClassLoader The class to be used in this case depends on the variable version. Once the appropriate class ia loaded, its instance and method are ready available to the client be be invoked. ## Wednesday, December 2, 2015 ### Kullback-Leibler Divergence on Apache Spark Overview The Kullback-Liebler divergence also known as the relative entropy, measures the dissimilarity of the distribution of random values (i.e. probability density function). Let's consider two distribution P and Q with probability density functions p and q $KL(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{q(x)}{p(x)}$ In the case of large data sets, the computation of the Kullback-Liebler divergence would benefit from a distributed environment such as Apache Spark. In this post, we will compare multiple data sets generated from a variety of continuous probability density functions against the ubiquitous normal distribution. Implementation of Kullback-Liebler First let's implement the Kullback-Leibler formula for measuring the dissimilarity between an existing data sets {xi, yi>} (presumably, generated by a specific random variable / distribution) and a given continuous distribution with a probability density function f object KullbackLiebler { final val EPS = 1e-10 type DATASET = Iterator[(Double, Double)] def execute( xy: DATASET, f: Double => Double): Double = { val z = xy.filter{ case(x, y) => abs(y) > EPS} - z./:(0.0){ case(s, (x, y)) => s + y*log(f(x)/y)} } def execute( xy: DATASET, fs: Iterable[Double=>Double]): Iterable[Double] = fs.map(execute(xy, _)) }  The second execute static method compute the KL divergence value for a sequence of distribution defined by its list fs of probability density functions. Next let's define some of the continuous random distributions to be evaluated against the normal distribution def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1) val gauss = (x: Double) => INV_PI*exp(-x*x/2.0) val uniform = (x: Double) => x val logNormal = (x: Double) => { val lx = log(x) INV_PI/x*exp(-lx*lx) } val gamma = (x: Double, n: Int) => exp(-x)*pow(x, n)/fact(n) val logGamma = (x: Double, alpha: Int, beta: Int) => exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1)) val _beta = (n: Int, m: Int) => { val f = if(n < 2) 1 else fact(n-1) val g = if(m < 2) 1 else fact(m-1) f*g/fact(n+m -1).toDouble } val beta = (x: Double, alpha: Int, beta: Int) => pow(x, alpha-1)*pow(x, beta-1)/_beta(alpha, beta) val chiSquare = (x: Double, k: Int) => { val k_2 = k >>1 pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2)) }  Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density function are implemented as Scala partially applied functions with predefined parameter values. For example... val gamma2 = gamma( _ : Double, 2) val beta25 = beta(_: Double, 2, 5) val chiSquare4 = chiSquare(_: Double, 4)  Spark to the rescue The executing the Kullback-Leibler formula for a large data set with a significant number of random distributions may be a daunting computation task. A solution is to use Apache Spark framework to parallelize the computation of the divergence. Here are the steps: • Partition the original data set • Broadcast the first sequence of probability density functions • Execute the Kullback-Leibler formula on each partition using mapPartitions • Collect then aggregate the divergence factor from each partition final val NUM_DATA_POINTS = 10000000 val numTasks: Int = 128 val conf = new SparkConf().setAppName("Kullback-Liebler").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")


Let's simulate a data set using the normal distribution. The resulting resilient distributed dataset is cached.

val master_rdd = generateData(gauss, NUM_DATA_POINTS, sc)
master_rdd.persist.cache


Next we broadcast a batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
1 -> uniform, 2 -> logNormal, 3 -> gamma1, 4 -> gamma2, 5 -> gamma4,
6 -> logGamma12, 7 -> logGamma22, 8 -> beta22, 9 -> beta25,
10 -> chiSquare2, 11 -> chiSquare4
)



Finally, let's implement the mapPartitions transformation. The method mapPartitions transform an array of values pair {x, y} into an array of size pdfs.size containing the KL divergence value related to each distribution to be evaluated.

val kl_rdd: RDD[Double] = master_rdd.mapPartitions((it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map(n => pdfs.get(n).get)
execute(it, pdfsList).iterator
} )


Finally, the divergence values from each partitions is collected then aggregated using the Scala fold operator :/.

val kl_master = kl_rdd.collect
val divergences = (0  until kl_master.size by pdfs.size)
./:(Array.fill(pdfs.size)(0.0))( (s, n) => {
(0 until pdfs.size).foreach(j => s.update(j, kl_master(n+j)))
s
}).map( _ / kl_master.length)


Note: This particular implementation relies on the mapPartitions method to execute the KL divergence over each partition then execute the aggregation on the driver. A more common approach would be use the method RDD.aggregate

val pdfsList = pdfs.values.toList

val values = master_rdd.aggregate( Array.fill(pdfs.size)(0.0))((s, xy) =>
s.zipWithIndex.map{ case (_s, n) =>
_s + xy._2*log( pdfsList(n)(xy._1)/xy._2 )
},
(xt: Array[Double], yt: Array[Double]) => xt.zip(yt)
.map(z => z._1 + z._2))


Test results
The results for the test evaluation of the dissimilarity between the data sets and a set of given probability distribution are displayed as follows:

References
Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006
Apache Spark documentation

## Friday, November 20, 2015

### Recursive Minimum Spanning Tree in Scala

Overview
Finding the optimum arrangement to connect nodes is a common problem in Network design, transportation projects or electrical wiring. Each connectivity is usually defined as a weight (cost, length, time...). The purpose is to compute the schema that connects all the nodes that minimize the total weight. This problem is known as the minimum spanning tree or MST related to the nodes connected through an un-directed graph.
Several algorithms have been developed over the last 70 years to extract the MST from a graph. This post focuses on the implementation of the Prim algorithm in Scala.

Prim's algorithm
There are many excellent tutorial on graph algorithm and more specifically on the Prim's algorithm. I recommend Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
Let's PQ is a priority queue, a Graph G(V, E) with n vertices V and E edges w(u,v). A Vertex v is defined by
• An identifier
• A parent tree(v)
The Prim's algorithm can be easily expressed as a simple iterative process. It consists of using a priority queue of all the vertices in the graph and update their load to select the next node in the spanning tree. Each node are popped up (and removed) from the priority queue before being inserted in the tree.
PQ <- V(G)
foreach u in PQ

while PQ nonEmpty
do u <- v in adj(u)
if v in PQ && load(v) < w(u,v)
then
tree(v) <- u

The Scala implementation relies on a tail recursion to transfer vertices from the priority queue to the spanning tree

Scala implementation: graph definition
The first step is to define a graph structure with edges and vertices. The graph takes two arguments:
• numVertices number of vertices
• start index of the root of the minimum spanning tree
The vertex class has three attributes
• id identifier (arbitrary an integer)
• tree reference to the minimum spanning tree
final class Graph(numVertices: Int, start: Int = 0) {

class Vertex(val id: Int,
var tree: Int = -1)

val vertices = List.tabulate(numVertices)(n => new Vertex(n))
val edges = new HashMap[Vertex, HashMap[Vertex, Int]]

def += (from: Int, to: Int, weight: Int): Unit = {
val fromV = vertices(from)
val toV = vertices(to)
connect(fromV, toV, weight)
connect(toV, fromV, weight)
}
def connect(from: Vertex, to: Vertex, weight: Int): Unit = {
if( !edges.contains(from))
edges.put(from, new HashMap[Vertex, Int])
edges.get(from).get.put(to, weight)
}
...
}



The vertices are initialized by with a unique identifier id, and a default load Int.MaxValue. In most case, the identifier is a characters string or a data structure. As described in the pseudo-code, the load for the root of the spanning tree is defined a 0.
The load is defined as an integer for performance's sake. It is recommended to convert (quantization) a floating point value to an integer for the processing of very large graph, then convert back to a original format on the resulting minimum spanning tree.
The edges are defined as hash table with the source vertex as key and the hash table of destination vertex and edge weight as value.
The graph is un-directed therefore the connection initialized in the method += are bi-directional.

Scala implementation: priority queue
The priority queue is used to reordered the vertices and select the next vertex to be added to the spanning tree.
Note: There are many different implementation of priority queues in Scala and Java. You need to keep in mind that the Prim's algorithm requires the queue to be reordered after its load is updated (see pseudo-code). The PriorityQueue classes in the Scala and Java libraries do not allow elements to be removed or to be explicitly re-ordered. An alternative is to used a binary tree, red-black tree for which elements can be removed and the tree re-balanced.
The implementation of the priority has a impact on the time complexity of the algorithm. The following implementation of the priority queue is provided only to illustrate the Prim's algorithm.
class PQueue(vertices: List[Vertex]) {
var queue = vertices./:(new PriorityQueue[Vertex])((pq, v) => pq += v)

def += (vertex: Vertex): Unit = queue += vertex
def pop: Vertex = queue.dequeue
def sort: Unit = {}
def push(vertex: Vertex): Unit = queue.enqueue(vertex)
def nonEmpty: Boolean = queue.nonEmpty
}

type MST = ListBuffer[Int]


The Scala PriorityQueue class required the implicit ordering of vertices using their load. This accomplished by defining an implicit conversion of a type T with upper-bound type Vertex to Ordering[T]
Note: The type T has to be a sub-class of Vertex. A direct conversion from Vertex type to Ordering[Vertex] is not allowed in Scala.

Scala implementation: Prim
This implementation is the direct translation of the pseudo-code presented in the second paragraph. It relies on the efficient Scala tail recursion.
def prim: List[Int] = {
val queue = new PQueue(vertices)

@scala.annotation.tailrec
def prim(parents: MST): Unit = {
if( queue.nonEmpty ) {
.filter{
case(vt,w) => vt.tree == -1 && w <= vt.load
}

if( candidates.nonEmpty ) {
candidates.foreach {case (vt, w) => vt.load = w }
queue.sort
}
prim(parents)
}
}
val parents = new MST
prim(parents)
parents.toList
}


Time complexity
Minimum spanning tree with linear queue: V2
Minimum spanning tree with binary heap: (E + V).LogV
Minimum spanning tree with Fibonacci heap: V.LogV
Note: See Summary of time complexity of algorithms for details.

References
• Introduction to Algorithms Chapter 24 Minimum Spanning Trees - T. Cormen, C. Leiserson, R. Rivest - MIT Press 1989
• Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
• Graph Theory Chapter 4 Optimization Involving Tree - V.K. Balakrishnan - Schaum's Outlines Series, McGraw Hill, 1997

## Monday, November 9, 2015

### Time Complexity: Graph & Machine Learning Algorithms

Overview
Time complexity (or worst case scenario for the duration of execution given a number of elements) is commonly used in computing science. However, you will be hard pressed to find a comparison of machine learning algorithms using their asymptotic execution time.

Summary
The following summary list the time complexity of some of the most common algorithms used in machine learning including, recursive computation for dynamic programming, linear algebra, optimization and discriminative supervised training.

 Algorithm Time complexity Description Recursion with one element reduction N2 N: Number of elements Recursion with halving reduction LogN Recursion with halving reduction and processing N Nearest neighbors search M.logk.N.LogN M: number of featuresk: number of neighborsN: number of observations Matrix multiplication (m,n)x(n,d) m.n.d m,n,d matrices dimension Matrix multiplication (n,n) n3 n matrix dimension Matrix multiplication (n,n)Strassen n2.8 n matrix dimension Partial eigenvalues extraction (n,n) e.N2 e: number of eigenvaluesN: number of observations Complete eigenvalues extraction (n,n) N3 N: number of observations Minimum spanning treePrim linear queue V2 V: number vertices Minimum spanning treePrim binary heap (E + V).LogV E: number of edgesV: number vertices Minimum spanning treePrim Fibonacci heap V.LogV E: number of edgesV: number vertices Shortest paths GraphDijkstra linear sorting V2 V: number of vertices Shortest paths GraphDijkstra binary heap (E + V).logV V: number of vertices Shortest paths GraphDijkstra Fibonacci heap V.log E: number of edgesV: number of vertices Shortest paths kNNGraph - Dijkstra (k + logN).N2 k: number of neighborsN: number of observations Shortest paths kNNGraph - Floyd-Warshall N3 N: number of observations Fast Fourier transform N.LogN N: Number of observations Batched gradient descent N.P.I N: Number of observationsP: number of parametersI: number of iterations Stochastic gradient descent N.P.I N: number of observationsP: Number of variablesI: number of epochs Newton with Hessian computation N3.I N: number of observationsI: number iterations Conjugate gradient N.m.sqrt(k) N: number of observationsm: number of non-zerok condition of the matrix L-BFGS N.M.I N: number of observationsM: estimated number of gradsI: number of iterations K-means (*) C.N.M.I C: Number of clustersM: DimensionN: number observationsI: number of iterations Lasso regularization - LARS(*) N.M.min(N,M) M: DimensionN: number observations Hidden Markov modelForward-backward pass N2.M N: number of statesM: number of observations Multilayer Perceptron (*) n.M.P.N.e n: input variablesM: number hidden neuronsP: number output valuesN: number of observationse: Number of epochs Support vector machine (*)using Newton N3 N: number of observations Support vector machine (*)using Cholesky N2 N: number of observations Support vector machine (*)Sequential minimal optimization N2 N: number of observations Principal Components Analysis (*) M2N + N3 N: Number of observationsM: number of features Expectation-Maximization (*) M2N N: Number of observationsM: number of variables Laplacian Eigenmaps M.logk.N.logN + m.N.k2 + d.N2 N: Number of observationsM: number of variables Genetic algorithms P.logP.I.C C: number of genes/chromosomeP: population sizeI: Number of iterations
(*): Training

References
Introduction to Algorithms T. Cormen, C. Leiserson, R. Rivest - MIT Press 1990
Machine Learning: A probabilistic Perspective K. Murphy - MIT Press, 2012
Big O cheat sheet

## Wednesday, October 28, 2015

### Recursive Mean & Standard Deviation in Scala

Overview
The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
$\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}$
The second formula allows to update the mean and standard deviation with any new data point (online computation).
$\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}$
This second approach relies on the computation the sum of square values that can overflow

val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt((x.map( _ - mean).map(t => t*t).sum)/x.length)


Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
$\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}$ The recursive formula for the standard deviation is
$\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}$
Let's implement these two recursive formula in Scala using the tail recursion

def meanStd(x: Array[Double]): (Double, Double) ={

@scala.annotation.tailrec
def meanStd(x: Array[Double], mu: Double, Q: Double, count: Int): (Double, Double) =
if (count >= x.length) (mu, Math.sqrt(Q/x.length))
else {
val newCount = count +1
val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
val newQ = Q + (x(count) - mu)*(x(count) - newMu)
meanStd(x, newMu, newQ, newCount)
}

meanStd(x, 0.0, 0.0, 0)
}


This implementation update the mean and the standard deviation for each new data point simultaneously.

## Friday, October 9, 2015

### Evaluation of Optimizers for Logistic Regression in Apache Spark

Overview
Apache Spark 1.5.x MLlib library provides developers and data scientists alike, two well known optimizers for the binomial classification using the logistic regression.
• Limited memory version of the Broyden-Fletcher-Goldfarb-Shanno (L-BFGS)
I thought it would be instructive to compare the two optimization methods on the accuracy of the logistic regression as a binomial classification.
Note: MLlib in Apache Spark 1.5.1 does not support multinomial classification. It may be added in the future versions.
NoteBackground information on the stochastic gradient descent can be found at

Logistic regression
The logistic regression is one of the most commonly used discriminative supervised learning model mainly because it is intuitive and simple. It relies on the logistic function (refer to An Introduction to Logistic and Probit Regression Models
In the case of the classification problem, the probability that on observation x belong to a class C is computed as $p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}$ where w are the weights or model coefficients.
Apache Spark MLlib has two implementations of the logistic regression as a binary classifier

• org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
• org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer
Background information on the stochastic gradient descent can be found at Comparing Stochastic Gradient Descent And Batch Gradient Descent
For those interested in inner workings of the limited memory Broyden-Fletcher-Goldfarb-Shanno algorithm, Limited Memory BFGS

Data generation
The scaladocs for Apache Spark API are available at Apache Spark API
Let's create a synthetic training set to evaluate and compare the two implementation of the logistic regression. The training set for the binomial classification consist of

• Two data sets of observations with 3 features, each following a data distribution with same standard deviation and two different means.
• Two labels (or expected values) {0, 1}, one for each Gaussian distribution
The following diagram illustrates the training set for a single feature.

The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase.

final val SIGMA = 2.0
class DataGenerator(numTasks: Int)(implicit sc: SparkContext) {
private def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

def apply(halfTrainSet: Int, mu: Double): Array[LabeledPoint] = {
val trainObs =
ArrayBuffer.fill(halfTrainSet)(Array[Double](f(1.0), f(1.0), f(1.0))) ++
ArrayBuffer.fill(halfTrainSet)(Array[Double](f(mu), f(mu), f(mu)))

val labels = ArrayBuffer.fill(halfTrainSet)(0.0) ++
ArrayBuffer.fill(halfTrainSet)(1.0)
labels.zip(trainObs).map{ case (y, ar) =>
LabeledPoint(y, new DenseVector(ar)) }.toArray
}
}


The method apply generates the two groups of halfTrainSet observations following normal distribution of mean 1.0 and 1.0 + mu.
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.

Test
The first step consists of initializing the Apache spark environment.

val numTasks: Int = 64
val conf = new SparkConf().setAppName("LogisticRegr").setMaster(s"local[$numTasks]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // Execution of Spark driver code here... sc.stop  The next step is to generate the training and validation set. The validation set is used at a later stage for comparing the accuracy of the respective model. val halfTrainSet = 32000 val dataGenerator = new DataGenerator(numTasks)(sc) val trainSet = dataGenerator(halfTrainSet, mean) val validationSet = dataGenerator(halfTrainSet, mean)  It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models. val logRegrSGD = new LogisticRegressionWithSGD logRegrSGD.optimizer.setNumIterations(1000) logRegrSGD.optimizer.setConvergenceTol(0.02) val inputRDD = sc.makeRDD(trainingSet, numTasks) logisticRegression.setIntercept(true) val model = logisticRegression.run(inputRDD)  Validation Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin. We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows • metrics produced by the Spark logistic regression • mSSE Mean sum of square errors • accuracy accuracy of the classification The quality metrics are defined in the Quality class as described in the following code snippet. case class Quality( metrics: Array[(Double, Double)], msse: Double, accuracy: Double) { override def toString: String = s"Metrics:${metrics.mkString(",")}\nmsse = ${Math.sqrt(msse)} accuracy =$accuracy"
}


Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
sc: SparkContext,
model: LogisticRegressionModel,

def metrics(validationSet: Array[LabeledPoint]): Quality = {
val featuresLabels = validationSet.map( lbPt => (lbPt.label, lbPt.features)).unzip

val successes = scoreAndLabels.map{ case(e,p) => Math.abs(e-p) }.filter( _ < 0.1)
val msse = scoreAndLabels.map{ case (e,p) => (e-p)*(e-p)}.sum

val metrics = new BinaryClassificationMetrics(scoreAndLabels)
Quality(metrics.fMeasureByThreshold().collect,
msse,
successes.count.toDouble/validationSet.length)
}
}


The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics are extracted.
Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)


Results
The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.
Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).

References
An Introduction to Logistic and Probit Regression Models
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012