Tuesday, January 5, 2016

Simple class versioning in Scala

Overview
Versioning is a common challenge in commercial software development. The most common technique to support multiple versions of a library, executable or framework, in Java or Scala relies on the class loader.
A library can be easily versioned by creating multiple jar files, one for each version.

Overview
Let's select a simple case for each a library is self-contained in a jar file with two versions
  • ArrayIndex1.jar
  • ArrayIndex2.jar

import java.net.{URL, URLClassLoader}
import java.io.File
import java.lang.reflect.Method

val cl1 = new URLClassLoader( 
   Array[URL](new File("ArrayIndex1.jar").toURL),   
   Thread.currentThread().getContextClassLoader())
val cl2 = new URLClassLoader( 
   Array[URL](new File("ArrayIndex2.jar").toURL),  
   Thread.currentThread().getContextClassLoader())
  
val compatibleClass: Class[_] = if( version > 0.9) cl1.loadClass("ArrayIndex") else cl2.loadClass("ArrayIndex")

val obj = compatibleClass.newInstance
val methods: Array[Method] = compatibleClass.getMethods
println(s"methods ${methods.map( _.getName).mkString(",")}")

The first step consists of loading the two versions of the library through their URL by converting each jar file names to a URL. In this particular case, the class loader is undefined by its URL with the type URLClassLoader
The class to be used in this case depends on the variable version. Once the appropriate class ia loaded, its instance and method are ready available to the client be be invoked.

Wednesday, December 2, 2015

Kullback-Leibler Divergence on Apache Spark

Overview
The Kullback-Liebler divergence also known as the relative entropy, measures the dissimilarity of the distribution of random values (i.e. probability density function). Let's consider two distribution P and Q with probability density functions p and q
\[KL(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{q(x)}{p(x)}\]
In the case of large data sets, the computation of the Kullback-Liebler divergence would benefit from a distributed environment such as Apache Spark. In this post, we will compare multiple data sets generated from a variety of continuous probability density functions against the ubiquitous normal distribution.

Implementation of Kullback-Liebler
First let's implement the Kullback-Leibler formula for measuring the dissimilarity between an existing data sets {xi, yi>} (presumably, generated by a specific random variable / distribution) and a given continuous distribution with a probability density function f

object KullbackLiebler {
  final val EPS = 1e-10
  type DATASET = Iterator[(Double, Double)]

  def execute( xy: DATASET, f: Double => Double): Double = {
    val z = xy.filter{ case(x, y) => abs(y) > EPS}
    - z./:(0.0){ case(s, (x, y)) => s + y*log(f(x)/y)}
  }

  def execute( xy: DATASET, fs: Iterable[Double=>Double]): Iterable[Double] = 
    fs.map(execute(xy, _))
}

The second execute static method compute the KL divergence value for a sequence of distribution defined by its list fs of probability density functions.
Next let's define some of the continuous random distributions to be evaluated against the normal distribution

def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)

val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)
val uniform = (x: Double) => x
  
val logNormal = (x: Double) => { 
  val lx = log(x)
  INV_PI/x*exp(-lx*lx) 
}

val gamma = (x: Double, n: Int) => exp(-x)*pow(x, n)/fact(n)

val logGamma = (x: Double, alpha: Int, beta: Int) =>
  exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1))

val _beta = (n: Int, m: Int) => {
  val f = if(n < 2) 1 else fact(n-1)
  val g = if(m < 2) 1 else fact(m-1)
  f*g/fact(n+m -1).toDouble
}
val beta = (x: Double, alpha: Int, beta: Int) =>
  pow(x, alpha-1)*pow(x, beta-1)/_beta(alpha, beta)

val chiSquare = (x: Double, k: Int) => {
  val k_2 = k >>1
  pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density function are implemented as Scala partially applied functions with predefined parameter values. For example...

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5)
val chiSquare4 = chiSquare(_: Double, 4)


Spark to the rescue
The executing the Kullback-Leibler formula for a large data set with a significant number of random distributions may be a daunting computation task. A solution is to use Apache Spark framework to parallelize the computation of the divergence. Here are the steps:
  • Partition the original data set
  • Broadcast the first sequence of probability density functions
  • Execute the Kullback-Leibler formula on each partition using mapPartitions
  • Collect then aggregate the divergence factor from each partition

final val NUM_DATA_POINTS = 10000000
val numTasks: Int = 128
  
val conf = new SparkConf().setAppName("Kullback-Liebler").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

Let's simulate a data set using the normal distribution. The resulting resilient distributed dataset is cached.

val master_rdd = generateData(gauss, NUM_DATA_POINTS, sc)
master_rdd.persist.cache

Next we broadcast a batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 2 -> logNormal, 3 -> gamma1, 4 -> gamma2, 5 -> gamma4, 
  6 -> logGamma12, 7 -> logGamma22, 8 -> beta22, 9 -> beta25, 
  10 -> chiSquare2, 11 -> chiSquare4
)

val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))

Finally, let's implement the mapPartitions transformation. The method mapPartitions transform an array of values pair {x, y} into an array of size pdfs.size containing the KL divergence value related to each distribution to be evaluated.

val kl_rdd: RDD[Double] = master_rdd.mapPartitions((it:DATASET) => {
  val pdfsList = pdfs_broadcast.value.map(n => pdfs.get(n).get)
  execute(it, pdfsList).iterator
} )

Finally, the divergence values from each partitions is collected then aggregated using the Scala fold operator :/.

val kl_master = kl_rdd.collect
val divergences = (0  until kl_master.size by pdfs.size)
  ./:(Array.fill(pdfs.size)(0.0))( (s, n) => {
     (0 until pdfs.size).foreach(j => s.update(j, kl_master(n+j)))
     s
}).map( _ / kl_master.length)

Note: This particular implementation relies on the mapPartitions method to execute the KL divergence over each partition then execute the aggregation on the driver. A more common approach would be use the method RDD.aggregate

val pdfsList = pdfs.values.toList
  
val values = master_rdd.aggregate( Array.fill(pdfs.size)(0.0))((s, xy) =>
  s.zipWithIndex.map{ case (_s, n) =>
      _s + xy._2*log( pdfsList(n)(xy._1)/xy._2 )
  }, 
  (xt: Array[Double], yt: Array[Double]) => xt.zip(yt)
     .map(z => z._1 + z._2))

Test results
The results for the test evaluation of the dissimilarity between the data sets and a set of given probability distribution are displayed as follows:

References
Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006
Apache Spark documentation

Friday, November 20, 2015

Recursive Minimum Spanning Tree in Scala

Overview
Finding the optimum arrangement to connect nodes is a common problem in Network design, transportation projects or electrical wiring. Each connectivity is usually defined as a weight (cost, length, time...). The purpose is to compute the schema that connects all the nodes that minimize the total weight. This problem is known as the minimum spanning tree or MST related to the nodes connected through an un-directed graph.
Several algorithms have been developed over the last 70 years to extract the MST from a graph. This post focuses on the implementation of the Prim algorithm in Scala.

Prim's algorithm
There are many excellent tutorial on graph algorithm and more specifically on the Prim's algorithm. I recommend Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
Let's PQ is a priority queue, a Graph G(V, E) with n vertices V and E edges w(u,v). A Vertex v is defined by
  • An identifier
  • A load factor, load(v)
  • A parent tree(v)
  • The adjacent vertices adj(v)
The Prim's algorithm can be easily expressed as a simple iterative process. It consists of using a priority queue of all the vertices in the graph and update their load to select the next node in the spanning tree. Each node are popped up (and removed) from the priority queue before being inserted in the tree.
PQ <- V(G)
foreach u in PQ
  load(u) <- INFINITY
 
while PQ nonEmpty
  do u <- v in adj(u)
    if v in PQ && load(v) < w(u,v)
    then
      tree(v) <- u
      load(v) <- w(u,v)
The Scala implementation relies on a tail recursion to transfer vertices from the priority queue to the spanning tree

Scala implementation: graph definition
The first step is to define a graph structure with edges and vertices. The graph takes two arguments:
  • numVertices number of vertices
  • start index of the root of the minimum spanning tree
The vertex class has three attributes
  • id identifier (arbitrary an integer)
  • load dynamic load (or key) on the vertex
  • tree reference to the minimum spanning tree
final class Graph(numVertices: Int, start: Int = 0) {
 
  class Vertex(val id: Int, 
     var load: Int = Int.MaxValue, 
     var tree: Int = -1) 

  val vertices = List.tabulate(numVertices)(n => new Vertex(n))
  vertices.head.load = 0
  val edges = new HashMap[Vertex, HashMap[Vertex, Int]]

  def += (from: Int, to: Int, weight: Int): Unit = {
    val fromV = vertices(from)
    val toV = vertices(to)
    connect(fromV, toV, weight)
    connect(toV, fromV, weight)
  }
  def connect(from: Vertex, to: Vertex, weight: Int): Unit = {
    if( !edges.contains(from))
      edges.put(from, new HashMap[Vertex, Int])    
    edges.get(from).get.put(to, weight)
  }   
  ...
}


The vertices are initialized by with a unique identifier id, and a default load Int.MaxValue. In most case, the identifier is a characters string or a data structure. As described in the pseudo-code, the load for the root of the spanning tree is defined a 0.
The load is defined as an integer for performance's sake. It is recommended to convert (quantization) a floating point value to an integer for the processing of very large graph, then convert back to a original format on the resulting minimum spanning tree.
The edges are defined as hash table with the source vertex as key and the hash table of destination vertex and edge weight as value.
The graph is un-directed therefore the connection initialized in the method += are bi-directional.

Scala implementation: priority queue
The priority queue is used to reordered the vertices and select the next vertex to be added to the spanning tree.
Note: There are many different implementation of priority queues in Scala and Java. You need to keep in mind that the Prim's algorithm requires the queue to be reordered after its load is updated (see pseudo-code). The PriorityQueue classes in the Scala and Java libraries do not allow elements to be removed or to be explicitly re-ordered. An alternative is to used a binary tree, red-black tree for which elements can be removed and the tree re-balanced.
The implementation of the priority has a impact on the time complexity of the algorithm. The following implementation of the priority queue is provided only to illustrate the Prim's algorithm.
class PQueue(vertices: List[Vertex]) {
   var queue = vertices./:(new PriorityQueue[Vertex])((pq, v) => pq += v)
    
   def += (vertex: Vertex): Unit = queue += vertex
   def pop: Vertex = queue.dequeue
   def sort: Unit = {}
   def push(vertex: Vertex): Unit = queue.enqueue(vertex)
   def nonEmpty: Boolean = queue.nonEmpty
}
  
type MST = ListBuffer[Int]
implicit def orderingByLoad[T <: Vertex]: Ordering[T] = Ordering.by( - _.load)  

The Scala PriorityQueue class required the implicit ordering of vertices using their load. This accomplished by defining an implicit conversion of a type T with upper-bound type Vertex to Ordering[T]
Note: The type T has to be a sub-class of Vertex. A direct conversion from Vertex type to Ordering[Vertex] is not allowed in Scala.

Scala implementation: Prim
This implementation is the direct translation of the pseudo-code presented in the second paragraph. It relies on the efficient Scala tail recursion.
def prim: List[Int] = {
  val queue = new PQueue(vertices)
   
  @scala.annotation.tailrec
  def prim(parents: MST): Unit = {
    if( queue.nonEmpty ) {
      val head = queue.pop
      val candidates = edges.get(head).get
          .filter{ 
            case(vt,w) => vt.tree == -1 && w <= vt.load
          }
 
      if( candidates.nonEmpty ) {
        candidates.foreach {case (vt, w) => vt.load = w }
        queue.sort
      }
      parents.append(head.id)
      head.tree = 1
      prim(parents)
    }
  }
  val parents = new MST
  prim(parents)
  parents.toList
}

Time complexity
Minimum spanning tree with linear queue: V2
Minimum spanning tree with binary heap: (E + V).LogV
Minimum spanning tree with Fibonacci heap: V.LogV
Note: See Summary of time complexity of algorithms for details.

References
  • Introduction to Algorithms Chapter 24 Minimum Spanning Trees - T. Cormen, C. Leiserson, R. Rivest - MIT Press 1989
  • Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
  • Graph Theory Chapter 4 Optimization Involving Tree - V.K. Balakrishnan - Schaum's Outlines Series, McGraw Hill, 1997

Monday, November 9, 2015

Time Complexity: Graph & Machine Learning Algorithms

Overview
Time complexity (or worst case scenario for the duration of execution given a number of elements) is commonly used in computing science. However, you will be hard pressed to find a comparison of machine learning algorithms using their asymptotic execution time.

Summary
The following summary list the time complexity of some of the most common algorithms used in machine learning including, recursive computation for dynamic programming, linear algebra, optimization and discriminative supervised training.

AlgorithmTime complexityDescription
Recursion with one element reductionN2N: Number of elements
Recursion with halving reductionLogN
Recursion with halving reduction
and processing
N
Nearest neighbors searchM.logk.N.LogNM: number of features
k: number of neighbors
N: number of observations
Matrix multiplication (m,n)x(n,d)m.n.dm,n,d matrices dimension
Matrix multiplication (n,n)n3n matrix dimension
Matrix multiplication (n,n)
Strassen
n2.8n matrix dimension
Partial eigenvalues extraction (n,n)e.N2e: number of eigenvalues
N: number of observations
Complete eigenvalues extraction (n,n)N3N: number of observations
Minimum spanning tree
Prim linear queue
V2V: number vertices
Minimum spanning tree
Prim binary heap
(E + V).LogVE: number of edges
V: number vertices
Minimum spanning tree
Prim Fibonacci heap
V.LogVE: number of edges
V: number vertices
Shortest paths Graph
Dijkstra linear sorting
V2V: number of vertices
Shortest paths Graph
Dijkstra binary heap
(E + V).logVV: number of vertices
Shortest paths Graph
Dijkstra Fibonacci heap
V.logE: number of edges
V: number of vertices
Shortest paths kNN
Graph - Dijkstra
(k + logN).N2k: number of neighbors
N: number of observations
Shortest paths kNN
Graph - Floyd-Warshall
N3N: number of observations
Fast Fourier transformN.LogNN: Number of observations
Batched gradient descentN.P.IN: Number of observations
P: number of parameters
I: number of iterations
Stochastic gradient descentN.P.IN: number of observations
P: Number of variables
I: number of epochs
Newton with Hessian computationN3.IN: number of observations
I: number iterations
Conjugate gradientN.m.sqrt(k)N: number of observations
m: number of non-zero
k condition of the matrix
L-BFGSN.M.IN: number of observations
M: estimated number of grads
I: number of iterations
K-means (*)C.N.M.IC: Number of clusters
M: Dimension
N: number observations
I: number of iterations
Lasso regularization - LARS(*)N.M.min(N,M)M: Dimension
N: number observations
Hidden Markov model
Forward-backward pass
N2.MN: number of states
M: number of observations
Multilayer Perceptron (*)n.M.P.N.en: input variables
M: number hidden neurons
P: number output values
N: number of observations
e: Number of epochs
Support vector machine (*)
using Newton
N3N: number of observations
Support vector machine (*)
using Cholesky
N2N: number of observations
Support vector machine (*)
Sequential minimal optimization
N2N: number of observations
Principal Components Analysis (*)M2N + N3N: Number of observations
M: number of features
Expectation-Maximization (*)M2NN: Number of observations
M: number of variables
Laplacian EigenmapsM.logk.N.logN + m.N.k2 + d.N2N: Number of observations
M: number of variables
Genetic algorithmsP.logP.I.CC: number of genes/chromosome
P: population size
I: Number of iterations
(*): Training

References
Introduction to Algorithms T. Cormen, C. Leiserson, R. Rivest - MIT Press 1990
Machine Learning: A probabilistic Perspective K. Murphy - MIT Press, 2012
Big O cheat sheet

Wednesday, October 28, 2015

Recursive Mean & Standard Deviation in Scala

Overview
The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow

val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt((x.map( _ - mean).map(t => t*t).sum)/x.length)

Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n-1}-\mu _{n}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\]
Let's implement these two recursive formula in Scala using the tail recursion

def meanStd(x: Array[Double]): (Double, Double) ={
    
  @scala.annotation.tailrec
  def meanStd(x: Array[Double], mu: Double, Q: Double, count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
    }
  
  meanStd(x, 0.0, 0.0, 0)
}

This implementation update the mean and the standard deviation for each new data point simultaneously.

Friday, October 9, 2015

Evaluation of Optimizers for Logistic Regression in Apache Spark

Overview
Apache Spark 1.5.x MLlib library provides developers and data scientists alike, two well known optimizers for the binomial classification using the logistic regression.
  • Stochastic gradient descent (SGD)
  • Limited memory version of the Broyden-Fletcher-Goldfarb-Shanno (L-BFGS)
I thought it would be instructive to compare the two optimization methods on the accuracy of the logistic regression as a binomial classification.
Note: MLlib in Apache Spark 1.5.1 does not support multinomial classification. It may be added in the future versions.
NoteBackground information on the stochastic gradient descent can be found at

Logistic regression
The logistic regression is one of the most commonly used discriminative supervised learning model mainly because it is intuitive and simple. It relies on the logistic function (refer to An Introduction to Logistic and Probit Regression Models
In the case of the classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.
Apache Spark MLlib has two implementations of the logistic regression as a binary classifier

  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer
Background information on the stochastic gradient descent can be found at Comparing Stochastic Gradient Descent And Batch Gradient Descent
For those interested in inner workings of the limited memory Broyden-Fletcher-Goldfarb-Shanno algorithm, Limited Memory BFGS

Data generation
The scaladocs for Apache Spark API are available at Apache Spark API
Let's create a synthetic training set to evaluate and compare the two implementation of the logistic regression. The training set for the binomial classification consist of

  • Two data sets of observations with 3 features, each following a data distribution with same standard deviation and two different means.
  • Two labels (or expected values) {0, 1}, one for each Gaussian distribution
The following diagram illustrates the training set for a single feature.



The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase.


final val SIGMA = 2.0
class DataGenerator(numTasks: Int)(implicit sc: SparkContext) {
  private def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(halfTrainSet: Int, mu: Double): Array[LabeledPoint] = {
    val trainObs =
      ArrayBuffer.fill(halfTrainSet)(Array[Double](f(1.0), f(1.0), f(1.0))) ++
        ArrayBuffer.fill(halfTrainSet)(Array[Double](f(mu), f(mu), f(mu)))

    val labels = ArrayBuffer.fill(halfTrainSet)(0.0) ++ 
ArrayBuffer.fill(halfTrainSet)(1.0)
    labels.zip(trainObs).map{ case (y, ar) => 
          LabeledPoint(y, new DenseVector(ar)) }.toArray
  }
}

The method apply generates the two groups of halfTrainSet observations following normal distribution of mean 1.0 and 1.0 + mu.
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.


Test
The first step consists of initializing the Apache spark environment.

val numTasks: Int = 64
val conf = new SparkConf().setAppName("LogisticRegr").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

  // Execution of Spark driver code here...

sc.stop

The next step is to generate the training and validation set. The validation set is used at a later stage for comparing the accuracy of the respective model.

val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sc)
    
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)

It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.

val logRegrSGD = new LogisticRegressionWithSGD
logRegrSGD.optimizer.setNumIterations(1000)
logRegrSGD.optimizer.setConvergenceTol(0.02)

val inputRDD = sc.makeRDD(trainingSet, numTasks)
logisticRegression.setIntercept(true)
val model = logisticRegression.run(inputRDD)

Validation
Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • mSSE Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.
case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {
 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\nmsse = ${Math.sqrt(msse)} accuracy = $accuracy"
}

Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
   sc: SparkContext, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = validationSet.map( lbPt => (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(sc.makeRDD(featuresLabels._2, numTasks))

   val scoreAndLabels = sc.makeRDD(featuresLabels._1, numTasks).zip(predicted_rdd)
  
   val successes = scoreAndLabels.map{ case(e,p) => Math.abs(e-p) }.filter( _ < 0.1)
   val msse = scoreAndLabels.map{ case (e,p) => (e-p)*(e-p)}.sum

   val metrics = new BinaryClassificationMetrics(scoreAndLabels)
   Quality(metrics.fMeasureByThreshold().collect, 
        msse, 
        successes.count.toDouble/validationSet.length)
  }
}

The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics are extracted.
Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)

Results
The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.
Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).




References
An Introduction to Logistic and Probit Regression Models
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012