Wednesday, October 28, 2015

Recursive mean and standard deviation in Scala

The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow

val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt(( _ - mean).map(t => t*t).sum)/x.length)

Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n-1}-\mu _{n}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\]
Let's implement these two recursive formula in Scala using the tail recursion

def meanStd(x: Array[Double]): (Double, Double) ={
  def meanStd(x: Array[Double], mu: Double, Q: Double, count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
  meanStd(x, 0.0, 0.0, 0)

This implementation update the mean and the standard deviation for each new data point simultaneously.

Friday, October 9, 2015

Spark Logistic Regression SGD vs. L-BFGS

Apache Spark 1.5.x MLlib library provides developers and data scientists alike, two well known optimizers for the binomial classification using the logistic regression.
  • Stochastic gradient descent (SGD)
  • Limited memory version of the Broyden-Fletcher-Goldfarb-Shanno (L-BFGS)
I thought it would be instructive to compare the two optimization methods on the accuracy of the logistic regression as a binomial classification.
Note: MLlib in Apache Spark 1.5.1 does not support multinomial classification. It may be added in the future versions.
NoteBackground information on the stochastic gradient descent can be found at

Logistic regression
The logistic regression is one of the most commonly used discriminative supervised learning model mainly because it is intuitive and simple. It relies on the logistic function (refer to An Introduction to Logistic and Probit Regression Models
In the case of the classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.
Apache Spark MLlib has two implementations of the logistic regression as a binary classifier

  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer
Background information on the stochastic gradient descent can be found at Comparing Stochastic Gradient Descent And Batch Gradient Descent
For those interested in inner workings of the limited memory Broyden-Fletcher-Goldfarb-Shanno algorithm, Limited Memory BFGS

Data generation
The scaladocs for Apache Spark API are available at Apache Spark API
Let's create a synthetic training set to evaluate and compare the two implementation of the logistic regression. The training set for the binomial classification consist of

  • Two data sets of observations with 3 features, each following a data distribution with same standard deviation and two different means.
  • Two labels (or expected values) {0, 1}, one for each Gaussian distribution
The following diagram illustrates the training set for a single feature.

The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase.

final val SIGMA = 2.0
class DataGenerator(numTasks: Int)(implicit sc: SparkContext) {
  private def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(halfTrainSet: Int, mu: Double): Array[LabeledPoint] = {
    val trainObs =
      ArrayBuffer.fill(halfTrainSet)(Array[Double](f(1.0), f(1.0), f(1.0))) ++
        ArrayBuffer.fill(halfTrainSet)(Array[Double](f(mu), f(mu), f(mu)))

    val labels = ArrayBuffer.fill(halfTrainSet)(0.0) ++ 
ArrayBuffer.fill(halfTrainSet)(1.0){ case (y, ar) => 
          LabeledPoint(y, new DenseVector(ar)) }.toArray

The method apply generates the two groups of halfTrainSet observations following normal distribution of mean 1.0 and 1.0 + mu.
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.

The first step consists of initializing the Apache spark environment.

val numTasks: Int = 64
val conf = new SparkConf().setAppName("LogisticRegr").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)

  // Execution of Spark driver code here...


The next step is to generate the training and validation set. The validation set is used at a later stage for comparing the accuracy of the respective model.

val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sc)
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)

It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.

val logRegrSGD = new LogisticRegressionWithSGD

val inputRDD = sc.makeRDD(trainingSet, numTasks)
val model =

Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • mSSE Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.
case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {
 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\nmsse = ${Math.sqrt(msse)} accuracy = $accuracy"

Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
   sc: SparkContext, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = lbPt => (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(sc.makeRDD(featuresLabels._2, numTasks))

   val scoreAndLabels = sc.makeRDD(featuresLabels._1, numTasks).zip(predicted_rdd)
   val successes ={ case(e,p) => Math.abs(e-p) }.filter( _ < 0.1)
   val msse ={ case (e,p) => (e-p)*(e-p)}.sum

   val metrics = new BinaryClassificationMetrics(scoreAndLabels)

The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics are extracted.
Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)

The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.
Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).

An Introduction to Logistic and Probit Regression Models
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012

Tuesday, September 29, 2015

Histogram-based approximation using Apache Spark

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala.
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])((vals, x) => vals += (new Bin(x)) ))

  def this(params: HistogramParams) = this(params.first, params.step, params.initNumBins)
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double

Histogram has two constructors:
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data.
  • Instantiation of an histogram using an initial given step and an existing array of bin of width step. This constructor is invoked for subsequent batch of data.
  • Instantiation of an histogram using its parameters.
The parameters for the histogram are encapsulated in the following HistogramParams class

case class HistogramParams(
   val first: Double,
   val step: Double,
   val initNumBins: Int,
   var values: ArrayBuffer[Bin] = ArrayBuffer.empty[Bin])

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes.

object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def + (itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin] = {
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins += bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins += bin)

Test data generation
For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. The noise level is significant enough to create different data point while preserving the original data pattern so the algorithm can be easily validated.

final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input =
      ./:(new mutable.ArrayBuffer[(Float, Float)])((buf, xy) => {
      val x = addNoise(xy(0).trim.toFloat, r)
      val y = addNoise(xy(1).trim.toFloat, r)
      buf += ((x, y))
    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)

  private def addNoise(value: Float, r: Random): Float = value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat

Apache Spark driver
The first step is to create a Spark configuration and context for a given number of concurrent tasks.

val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.

The RDDs are generated by the DataGenerator instance as input(sc) and loaded recursively in partitions using the Spark method mapPartitionsWithIndex. Each partition generates a new array of histogram bins by instantiating the class Histogram and training on the batch of data. The data is then grouped by partition index. The fold generates the array of bins

val input = new DataGenerator("", numTasks)

def execute(iter: Int, initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin] = {
  val initParam = initParam_brdcast.value

  if( iter <= 0)
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex((idx: Int, it: Iterator[(Float, Float)]) => {
      // The first invocation used the broadcasted parameters for the histogram
      // and the subsequent invocation update the distribution of histogram bins
  val histogram =
  if (initParam.values.length == 0) new Histogram(initParam)
          else new Histogram(step, initParam.values)

     // Train the histogram with a new batch of data
  it.foreach { case (x, y) => histogram.train(x, y) }, _)).iterator
     .map { case (n, itBins) => }
     .fold(Array[Bin]())(Histogram +)

  execute(iter-1, sc.broadcast(
    HistogramParams(0.0, initParam.step, 0, 
        new mutable.ArrayBuffer[Bin] ++ hist_rdd))

The invocation of the recursive method execute on the driver broadcast the current (recently updated) parameters of the histogram initParms_brdcast to all the partitions. Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input(sc) and update the histogram parameters.

 val initNumBins = 2048
 val range = (-1.0, 2.0)
 var step = (range._2 - range._1)/initNumBins
 val maxRecursions = 4
 val hist = execute(maxRecursions, 
    sc.broadcast(HistogramParams(range._1 + 0.5 * step, step, initNumBins)))


Thursday, September 10, 2015

Histogram-based approximation in Scala

A typical function approximation consists of finding a model that fit a given data set. Let's consider the following data set {x, y} for which a simple model f: y = f(x) has to be approximated.

The black dots represent the data set and the red line the model y = f(x)
There are multiple approaches to approximate a model or a function to fit a given data set. The list includes

  • Splines
  • Least square regression
  • Levenberg-Marquardt
  • K-nearest neighbors
  • Histogram
  • Polynomial interpolation
  • Logistic regression
  • Neural Networks
  • Tensor productgs
  • ... and many more
Interpolation using dynamically resizable histograms is a very simple and effective method to approximate a data set.

An histogram consists of array or sequence of bins. A bin is defined with three parameters:
  • _x Center of the bin
  • _y Average value for the bin
  • count Number of data points in the bin
The distribution of histogram bins is illustrated in the following chart

Let's look at an implementation of the Bin class

class Bin(var _x: Double, var _y: Double =0.0) extends Serializable {
  var count: Int = 0

  def += (y: Double): Unit = {
    val newCount = count + 1
    _y = if(count == 0) y else (_y*count + y)/newCount
    count = newCount
  def + (next: Bin): this.type = {
    val newCount = count + next.count
    if(newCount > 0) _y = (_y*count + next._y*next.count)/newCount

  def + (next: Array[Bin]): this.type = {
    val newCount = next.aggregate(count)((s,nxt) => s +nxt.count, _ + _)
    if( newCount > 0) {
      _y = next./:(_y*count)((s, nxt) => s + nxt._y*nxt.count)/newCount
      count = newCount

The method += (y: Double): Unit adds a new value for this bin, and recomputes the average _y. The method + (next: Bin): this.type adds the content of a new bin to this bin. Finally, the method + (next: Array[Bin]): this.type merges an array of bins into this bin.
Next let's create a class, Histogram to manage the array of bins. The constructor for the histogram has four parameters
  • maxNumBins maximum number of bins
  • min expected minimum value in the data set
  • max expected maximum value in the data set
  • optional smoothing weights

final class Histogram(
    maxNumBins: Long, 
    var min: Double = -1.0, 
    var max: Double = 1.0, 
    weights: Array[Float] = Array.empty[Float]) {
  val initNumBins: Int = (maxNumBins>>RADIX).toInt
  private var step = (max-min)/initNumBins
  private[this] var values = Range(0, initNumBins-1)
        .scanLeft(min + 0.5*step)((s, n) => s + step)
        ./:(new ArrayBuffer[Bin])((vals, x) => vals += (new Bin(x)) )
  def train(x: Double, y: Double): Unit = {
    values(index(x)) += y
    if( values.size >= maxNumBins) >>
   final def predict(x: Double): Double = {
      if( x < min || x > max) Double.NaN 
      else if(weights.length > 0) smoothPredict(x)
      else values(index(x))._y
    // ... ancillary methods

The two main methods are
  • train which updates a model (histogram) with each new data point from the training set. The histogram expands when a new data point exceeds the current boundary (min, max) of the histogram.
  • predict which predicts the y value of the new observation x. The prediction relies on an interpolation (weighted moving average) is the user specifies an array of weights in the histogram constructor.
The method Histogram.index extracts the index of the bin containing a data point (x, y).

private def index(x: Double): Int = {
  val idx = ((x - min)/step).floor.toInt
  if( idx < 1) 0 else if (idx >= values.size) values.size -1 else idx

This implementation of the dynamically resizable histogram, the array of bins expends by adding new bins if the new data point from the training set has a x value that is either greater that the current maximum value or lower than the current minimum value. The width of the bins, step does not change, only the current number of bins. The number of bins expanded until the maximum number of bins, maxNumBins. The method Histogram.<< Implements the expansion of the histogram for a constant bin width.

private def >>(x: Double): Unit = 
  if(x > max) {
    values.appendAll(generate(max, x))
    max = values.last._x + step*0.5
  else if(x < min) {
    values.prependAll(generate(min, x))
    min = values.head._x - step*0.5
final private def generate(limit: Double, x: Double): Array[Bin] = 
  if( x > limit) {
     val nBins = ((x-limit)/step).ceil.toInt
     var z = limit - step*0.5
     Array.tabulate(nBins)(_ => {z += step; new Bin(z) } )
  else {
     val nBins = ((limit-x)/step).ceil.toInt
     var z = limit + step*0.5
     Array.tabulate(nBins)(_ => {z -= step; new Bin(z) } )

Once the maximum number of bins maxNumBins is reached, the histogram is resized by halving the current width of the bin step. The consolidation of the histogram bins is implemented by the method Histogram.>>

private def -- : Unit = 
  values = (0 until values.size-1 by 2)
     ./:(new ArrayBuffer[Bin])( (ab, n) => 
 ab += (values(n) + values(n+1)))

There is no guarantee that each bin has at least one data point count > 0. One workaround consist of interpolating over the values _y using the adjacent bins. In this specific implementation we uses a simple centered weighted moving average as implemented in the method Histogram.smoothPredict.

final private def smoothPredict(x: Double): Double = {  
  val idx = index(x)
  val window = if(idx == 0) (0, weights.length+1) 
  else if(idx == values.size-1) (values.size-weights.length, values.size)
  else (idx-1, idx+2)
  val smoothingWindow = values.slice(window._1, window._2)
         .filter( _._1.count > 0)
  val totalWeights =
  smoothingWindow.aggregate(0.0)((s, yw) => 
    s + yw._1._y*yw._2, _ + _)/totalWeights

The histogram class is tested by loading a data set from file. Loading the training set from file and generating the histogram from training is implemented by the following method.

def train(maxNumBins: Long, fileName: String): Try[Histogram] = Try {
  val src = Source.fromFile(fileName)
  val fields = _.split(DELIM))  

  val hist = new Histogram(maxNumBins, -1.0, -2.0)
  fields.foreach( xy =>  hist.train(xy(0).trim.toFloat, xy(1).trim.toFloat) )

The train method takes two arguments: the maximum number of bins for the histogram and the name of the source containing the training set.
Finally, the predictive method is used to compute the accuracy of the model, through the validate method.

def validate(hist: Histogram, fileName: String, eps: Double): Try[Double] = Try {
  val src = Source.fromFile(fileName)
  val fields = _.split(DELIM))
  val counts = fields./:((0L, 0L))((s, xy) => 
     if( Math.abs(hist.predict(xy(0).trim.toFloat) - xy(1).trim.toFloat) < eps) 
       (s._1 + 1L, s._2 + 1L) 
       (s._1, s._2 + 1L)
  val accuracy = counts._1.toDouble/counts._2

Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
Scala Cookbook A. Alexander O'Reilly 2013
Histogram-based approximation using Apache Spark Patrick Nicolas