Wednesday, December 2, 2015

Kullback-Leibler Divergence on Apache Spark

Implementation of the Kullback-Leibler statistical divergence using Scala and Apache Spark resilient distributed datasets.


Overview
The Kullback-Leibler divergence is a concept borrowed from information theory and commonly associated with Information Gain. It is also known as the relative entropy between two distributions. It measures the dissimilarity of the distribution of random values (i.e. probability density function). Let's consider two distribution P and Q with probability density functions p and q
\[D_{KL}(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{p(x)}{q(x)}\]
The formula can be interpreted as the Information lost when the distribution Q is used to approximate the distribution P
It is obvious that the computation if not symmetric: \[D_{KL}(P||Q)\neq D_{KL}(Q||P)\]
The Kullback-Leibler divergence is the used to compute the Mutual Information which is one of the feature extraction method. It is part of the family of F-Divergences.

The objective of this post is to implement Kullback-Liebler divergence in Scala, for which the computation is distributed through the Apache Spark framework:
  • Implementation of the Kullback-Leibler divergence using Apache Spark
  • Illustration of the Kullback-Leibler divergence in comparing multiple continuous probability density functions

Implementation of the Kullback-Liebler Divergence
First let's implement the Kullback-Leibler formula for measuring the dissimilarity between an existing data sets {xi, yi} (presumably, generated by a specific random variable / distribution) and a given continuous distribution with a probability density function f

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
object KullbackLiebler {
  final val EPS = 1e-10
  type DATASET = Iterator[(Double, Double)]

  def execute( 
    xy: DATASET, 
    f: Double => Double): Double = {

    val z = xy.filter{ case(x, y) => abs(y) > EPS}
    - z./:(0.0){ case(s, (x, y)) => {
       val px = f(x)
       s + px*log(px/y)}
    }
  }

  def execute( 
    xy: DATASET, 
    fs: Iterable[Double=>Double]): Iterable[Double] = 
    fs.map(execute(xy, _))
}

The divergence methods execute are defined as static (object methods)
The first invocation of the method execute (line 5) computes the dissimilarity between a distribution that generates the dataset xy with the distribution with a probability density f.
The second execute method computes the KL divergence between a that generates the dataset xy with a sequence of distributions with probability density fs.

Next let's define some of the continuous random distributions used to evaluate the distribution represented by the dataset xy
Normal, beta, lognormal, Gamma, LogGamma and ChiSquare.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   // One Gaussian distribution
val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)

   // Uniform distribution
val uniform = (x: Double) => x

  // Log normal distribution
val logNormal = (x: Double) => { 
  val lx = log(x)
  INV_PI/x*exp(-lx*lx) 
}
  // Gamma distribution
val gamma = (x: Double, n: Int) => 
   exp(-x)*pow(x, n)/fact(n)

  // Log Gamma distribution
val logGamma = (x: Double, alpha: Int, beta: Int) =>
  exp(beta*x)*exp(-exp(x)/alpha)/
    (pow(alpha, beta)*fact(beta-1))

  // Simple computation of m! (for beta)
def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)
  // Normalization factor for Beta
val cBeta = (n: Int, m: Int) => {
  val f = if(n < 2) 1 else fact(n-1)
  val g = if(m < 2) 1 else fact(m-1)
  f*g/fact(n+m -1).toDouble
}

  // Beta distribution
val beta = (x: Double, alpha: Int, beta: Int) =>
  pow(x, alpha-1)*pow(x, beta-1)/cBeta(alpha, beta)

  // Chi-Square distribution
val chiSquare = (x: Double, k: Int) => {
  val k_2 = k >>1
  pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density function are implemented as Scala partially applied functions with predefined parameter values. For example...

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5)
val chiSquare4 = chiSquare(_: Double, 4)

It is important to mention that the implementation of the probability density functions in the code snippet are not optimizer for performance. There are described merely for illustrate the application of the Kullback-Leibler divergence. Please refer to your favorite Statistics handbook to learn more about these probability distributions.

Spark to the rescue
The executing the Kullback-Leibler formula for a large data set with a significant number of random distributions may be a daunting task. A solution is to use Apache Spark framework to parallelize the computation of the divergence. Here are the steps to follow:
  • Partition the original data set
  • Broadcast the first sequence of probability density functions
  • Execute the Kullback-Leibler formula on each partition using mapPartitions
  • Collect then aggregate the divergence factor from each partition

1
2
3
4
5
6
final val NUM_DATA_POINTS = 10000000
val numTasks: Int = 128
  
val conf = new SparkConf().setAppName("Kullback-Liebler").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

Once we define the number of input data points (line 1) and the number of concurrent tasks (line 2), the Apache Spark context sc is configured with an instance of SparkConf (line 4).
Next let's implement the broadcasting mechanism for the batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 
  2 -> logNormal, 
  3 -> gamma1, 
  4 -> gamma2, 
  5 -> gamma4, 
  6 -> logGamma12, 
  7 -> logGamma22, 
  8 -> beta22, 
  9 -> beta25, 
  10 -> chiSquare2, 
  11 -> chiSquare4
)

val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))

The broadcast variable pdfs_broadcast alerts the Apache Spark framework that the probability density function pdfs are to be distributed over all the partition of worker (or data) nodes.
Value broadcasting is heavily used in the machine learning library MLlib to dispatch model coefficients (weights) to the data node for the computation of gradient and loss function.

Finally, let's implement the mapPartitions transformation. The method mapPartitions transform an array of values pair {x, y} into an array of size pdfs.size containing the KL divergence value related to each distribution to be evaluated.

1
2
3
4
5
6
val kl_rdd  = master_rdd.mapPartitions((it:DATASET) => {
  val pdfsList = pdfs_broadcast.value.map(
       n => pdfs.get(n).get
  )
  execute(it, pdfsList).iterator
} )

Finally, the divergence values from each partitions is collected then aggregated using the Scala fold operator :/.

1
2
3
4
5
6
7
8
val kl_master = kl_rdd.collect

val divergences = (0 until kl_master.size by pdfs.size)
  ./:(Array.fill(pdfs.size)(0.0))( (s, n) => {
     (0 until pdfs.size).foreach(j =>
         s.update(j, kl_master(n+j)))
         s
}).map( _ / kl_master.length)

Test results
The purpose of the test was to understand the potential of the Kullback-Leibler divergence to evaluate the dissimilarity between probability distribution.
We selected the Gaussian distribution as the benchmark for which other distributions are to be compared. The comparative results are displayed below:

As expected, the Gaussian and Normal distribution are very similar. However, the divergence between Gauss and the highest order of the Gamma and Beta distributions is quite significant. This can be easily confirmed by comparing the graph of their associated probability density functions.


References
Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006
Apache Spark documentation

Monday, November 9, 2015

Time Complexity: Graph & Machine Learning Algorithms

Summary of the time complexity of some common optimization and machine learning algorithms

Overview
Time complexity (or worst case scenario for the duration of execution given a number of elements) is commonly used in computing science. However, you will be hard pressed to find a comparison of machine learning algorithms using their asymptotic execution time.

Summary
The following summary list the time complexity of some of the most common algorithms used in machine learning including, recursive computation for dynamic programming, linear algebra, optimization and discriminative supervised training.

AlgorithmTime complexityDescription
Recursion with one element reductionN2N: Number of elements
Recursion with halving reductionLogN
Recursion with halving reduction
and processing
N
Nearest neighbors searchM.logk.N.LogNM: number of features
k: number of neighbors
N: number of observations
Matrix multiplication (m,n)x(n,d)m.n.dm,n,d matrices dimension
Matrix multiplication (n,n)n3n matrix dimension
Matrix multiplication (n,n)
Strassen
n2.8n matrix dimension
Partial eigenvalues extraction (n,n)e.N2e: number of eigenvalues
N: number of observations
Complete eigenvalues extraction (n,n)N3N: number of observations
Minimum spanning tree
Prim linear queue
V2V: number vertices
Minimum spanning tree
Prim binary heap
(E + V).LogVE: number of edges
V: number vertices
Minimum spanning tree
Prim Fibonacci heap
V.LogVE: number of edges
V: number vertices
Shortest paths Graph
Dijkstra linear sorting
V2V: number of vertices
Shortest paths Graph
Dijkstra binary heap
(E + V).logVV: number of vertices
Shortest paths Graph
Dijkstra Fibonacci heap
V.logE: number of edges
V: number of vertices
Shortest paths kNN
Graph - Dijkstra
(k + logN).N2k: number of neighbors
N: number of observations
Shortest paths kNN
Graph - Floyd-Warshall
N3N: number of observations
Fast Fourier transformN.LogNN: Number of observations
Batched gradient descentN.P.IN: Number of observations
P: number of parameters
I: number of iterations
Stochastic gradient descentN.P.IN: number of observations
P: Number of variables
I: number of epochs
Newton with Hessian computationN3.IN: number of observations
I: number iterations
Conjugate gradientN.m.sqrt(k)N: number of observations
m: number of non-zero
k condition of the matrix
L-BFGSN.M.IN: number of observations
M: estimated number of grads
I: number of iterations
K-means (*)C.N.M.IC: Number of clusters
M: Dimension
N: number observations
I: number of iterations
Lasso regularization - LARS(*)N.M.min(N,M)M: Dimension
N: number observations
Hidden Markov model
Forward-backward pass
N2.MN: number of states
M: number of observations
Multilayer Perceptron (*)n.M.P.N.en: input variables
M: number hidden neurons
P: number output values
N: number of observations
e: Number of epochs
Support vector machine (*)
using Newton
N3N: number of observations
Support vector machine (*)
using Cholesky
N2N: number of observations
Support vector machine (*)
Sequential minimal optimization
N2N: number of observations
Principal Components Analysis (*)M2N + N3N: Number of observations
M: number of features
Expectation-Maximization (*)M2NN: Number of observations
M: number of variables
Laplacian EigenmapsM.logk.N.logN + m.N.k2 + d.N2N: Number of observations
M: number of variables
Genetic algorithmsP.logP.I.CC: number of genes/chromosome
P: population size
I: Number of iterations
(*): Training

References
Introduction to Algorithms T. Cormen, C. Leiserson, R. Rivest - MIT Press 1990
Machine Learning: A probabilistic Perspective K. Murphy - MIT Press, 2012
Big O cheat sheet

Friday, October 9, 2015

Evaluation of Optimizers for Logistic Regression in Apache Spark

Simple comparison of the stochastic gradient descent with the quasi-Newton Limited memory BFGS optimizer for the binomial classification using the logistic regression in Apache Spark MLlib


Overview
Apache Spark 1.5.x MLlib library provides developers and data scientists alike, two well known optimizers for the binomial classification using the logistic regression.
  • Stochastic gradient descent (SGD)
  • Limited memory version of the Broyden-Fletcher-Goldfarb-Shanno (L-BFGS)
I thought it would be instructive to compare the two optimization methods on the accuracy of the logistic regression as a binomial classification.
Note: MLlib in Apache Spark 1.5.1 does not support multinomial classification. It may be added in the future versions.
NoteBackground information on the stochastic gradient descent can be found at

Logistic regression
The logistic regression is one of the most commonly used discriminative supervised learning model mainly because it is intuitive and simple. It relies on the logistic function (refer to An Introduction to Logistic and Probit Regression Models
In the case of the classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.
Apache Spark MLlib has two implementations of the logistic regression as a binary classifier

  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer
Background information on the stochastic gradient descent can be found at Comparing Stochastic Gradient Descent And Batch Gradient Descent
For those interested in inner workings of the limited memory Broyden-Fletcher-Goldfarb-Shanno algorithm, Limited Memory BFGS

Data generation
The scaladocs for Apache Spark API are available at Apache Spark API
Let's create a synthetic training set to evaluate and compare the two implementation of the logistic regression. The training set for the binomial classification consist of

  • Two data sets of observations with 3 features, each following a data distribution with same standard deviation and two different means.
  • Two labels (or expected values) {0, 1}, one for each Gaussian distribution
The following diagram illustrates the training set for a single feature.



The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
final val SIGMA = 2.0

class DataGenerator(numTasks: Int)(implicit sc: SparkContext) {
  private def f(mean: Double): Double = 
      mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(half: Int, mu: Double): Array[LabeledPoint] = {
    val trainObs =
      ArrayBuffer.fill(half)(Array[Double](f(1.0),f(1.0),f(1.0))) ++
      ArrayBuffer.fill(half)(Array[Double](f(mu),f(mu),f(mu)))

    val labels = ArrayBuffer.fill(half)(0.0) ++ 
                  ArrayBuffer.fill(half)(1.0)
    labels.zip(trainObs).map{ case (y, ar) => 
          LabeledPoint(y, new DenseVector(ar)) }.toArray
  }
}

The method apply generates the two groups of half observations following normal distribution of mean 1.0 and 1.0 + mu. (line 9 and 11).
Next we create two sets of labels 0 and 1 (line 13) that are used to generated the Apache Spark labeled points (line 15).
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.


Test
The first step consists of initializing the Apache spark environment, using SparkConf and SparkContext classes (line 2 & 4).
1
2
3
4
5
6
7
8
val numTasks: Int = 64
val conf = new SparkConf().setAppName("LogisticRegr")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

  // Execution of Spark driver code here...
sc.stop

The next step is to generate the training and validation set. The validation set is used at a later stage for comparing the accuracy of the respective model (line 5).
1
2
3
4
5
val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sc)
    
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)

It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.
This implementation uses the Logistic regression from MLlib that uses a pre-canned stochastic gradient descent (line 1). A customized gradient descent can be defined by using the standalone SGD class from MLlib.
For this example, the optimization parameters (line 2 & 3) are purely arbitrary. MLlib uses RDD as input for training and validation set (line 4) while the logistic regression in ML uses instances of DataFrame.
1
2
3
4
5
6
val logRegrSGD = new LogisticRegressionWithSGD 
logRegrSGD.optimizer.setNumIterations(1000) 
logRegrSGD.optimizer.setConvergenceTol(0.02) 
val inputRDD = sc.makeRDD(trainingSet, numTasks) 
logisticRegression.setIntercept(true) val model = 
logisticRegression.run(inputRDD)

Validation
Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • mSSE Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.
1
2
3
4
5
6
7
8
9
case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {

 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\n
    |msse = ${Math.sqrt(msse)} accuracy = $accuracy"
}

Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final class BinomialValidation(
   sc: SparkContext, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = validationSet.map( lbPt => 
       (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(    
         sc.makeRDD(featuresLabels._2, numTasks)
   )

   val scoreAndLabels = sc.makeRDD(featuresLabels._1, 
           numTasks).zip(predicted_rdd)
  
   val successes = scoreAndLabels.map{ 
        case(e,p) => Math.abs(e-p) }.filter( _ < 0.1)
   val msse = scoreAndLabels.map{ 
        case (e,p) => (e-p)*(e-p)}.sum

   val metrics = new BinaryClassificationMetrics(scoreAndLabels)
   Quality(metrics.fMeasureByThreshold().collect, 
        msse, 
        successes.count.toDouble/validationSet.length)
  }
}

The method metrics (line 6) converts the validation set, validationSet into a RDD (line 7, 8) after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd (line 9) is then zipped with the labeled values into the evaluation set, scoreAndLabels (line 13-14) from which the different quality metrics such as successes (line 16) and muse (line 18) are extracted.
The computation of metrics is actually performed by the BinaryClassificationMetrics MLlib class. Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

1
2
3
model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)

Results
The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.
Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).




References
An Introduction to Logistic and Probit Regression Models
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012

Thursday, September 10, 2015

Histogram-based Approximation in Scala

Introduction to a simple function approximation algorithm using a dynamically resizable histogram in Scala.

Overview
A typical function approximation consists of finding a model that fit a given data set. Let's consider the following data set {x, y} for which a simple model f: y = f(x) has to be approximated.


The black dots represent the data set and the red line the model y = f(x)
There are multiple approaches to approximate a model or a function to fit a given data set. The list includes

  • Splines
  • Least square regression
  • Levenberg-Marquardt
  • K-nearest neighbors
  • Histogram
  • Polynomial interpolation
  • Logistic regression
  • Neural Networks
  • Tensor productgs
  • ... and many more
Interpolation using dynamically resizable histograms is a very simple and effective method to approximate a data set.

Histogram
An histogram consists of array or sequence of bins. A bin is defined with three parameters:
  • _x Center of the bin
  • _y Average value for the bin
  • count Number of data points in the bin
The distribution of histogram bins is illustrated in the following chart

Let's look at an implementation of the Bin class. The constructor takes two values:
  • _x mid point of the bin (or bucket)
  • _y current frequency for this bin


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Bin(
   var _x: Double, 
   var _y: Double =0.0) extends Serializable {
  var count: Int = 0

  def += (y: Double): Unit = {
    val newCount = count + 1
    _y = if(count == 0) y else (_y*count + y)/newCount
    count = newCount
  }
 
  def + (next: Bin): this.type = {
    val newCount = count + next.count
    if(newCount > 0) 
     _y = (_y*count + next._y*next.count)/newCount
    this
  }

  def + (next: Array[Bin]): this.type = {
    val newCount = next.aggregate(count)(
      (s,nxt) => s +nxt.count, _ + _
    )
    if( newCount > 0) {
      _y = next./:(_y*count)(
         (s, nxt) => s + nxt._y*nxt.count)/newCount
      count = newCount
    }
    this
  }
}

The method += (y: Double): Unit adds a new value y for this bin (line 6). It recomputes the average frequency _y (line 8). The method + (next: Bin): this.type (line 12) adds the content of another bin, next to this bin (line 15). Finally, the method + (next: Array[Bin]): this.type (line 19) merges an array of bins into this bin (line 23-26).
Next let's create a class, Histogram (line 1) to manage the array of bins. The constructor for the histogram has four parameters
  • maxNumBins maximum number of bin (line 2)
  • min expected minimum value in the data set (line 3)
  • max expected maximum value in the data set (line 4)
  • optional smoothing weights (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class Histogram(
    maxNumBins: Long, 
    var min: Double = -1.0, 
    var max: Double = 1.0, 
    weights: Array[Float] = Array.empty[Float]) {
 
  val initNumBins: Int = (maxNumBins>>RADIX).toInt
  private var step = (max-min)/initNumBins
  
  private[this] var values = Range(0, initNumBins-1)
        .scanLeft(min + 0.5*step)((s, n) => s + step)
        ./:(new ArrayBuffer[Bin])(
          (vals, x) => vals += (new Bin(x)) 
        )
 
  def train(x: Double, y: Double): Unit = {
    <<(x)
    values(index(x)) += y
    if( values.size >= maxNumBins) >>
   }
 
   final def predict(x: Double): Double = {
      if( x < min || x > max) Double.NaN 
      else if(weights.length > 0) smoothPredict(x)
      else values(index(x))._y
   }
    // ... ancillary methods
 }

The two main methods are
  • train (line 16) which updates a model (histogram) with each new data point from the training set. The histogram expands when a new data point exceeds the current boundary (min, max) of the histogram (line 19).
  • predict (line 22) which predicts the value y of the new observation x. The prediction relies on an interpolation (weighted moving average) (line 24) in the case the user specifies an array of weights in the histogram constructor.
The method Histogram.index extracts the index of the bin containing a data point (x, y).

1
2
3
4
5
6
private def index(x: Double): Int = {
  val idx = ((x - min)/step).floor.toInt
  if( idx < 1) 0 
  else if (idx >= values.size) values.size -1 
  else idx
}

This implementation of the dynamically resizable histogram, the array of bins expends by adding new bins if the new data point from the training set has a x value that is either greater that the current maximum value or lower than the current minimum value. The width of the bins, step does not change, only the current number of bins. The number of bins expanded until the maximum number of bins, maxNumBins. The method Histogram.<< Implements the expansion of the histogram for a constant bin width.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private def >>(x: Double): Unit = 
  if(x > max) {
    values.appendAll(generate(max, x))
    max = values.last._x + step*0.5
  }
  else if(x < min) {
    values.prependAll(generate(min, x))
    min = values.head._x - step*0.5
  }
 
final def generate(limit: Double, x: Double): Array[Bin] = 
  if( x > limit) {
     val nBins = ((x-limit)/step).ceil.toInt
     var z = limit - step*0.5
     Array.tabulate(nBins)(_ => {z += step; new Bin(z) })
  }
  else {
     val nBins = ((limit-x)/step).ceil.toInt
     var z = limit + step*0.5
     Array.tabulate(nBins)(_ => {z -= step; new Bin(z) })
  }

Once the maximum number of bins maxNumBins is reached, the histogram is resized by halving the current width of the bin step. The consolidation of the histogram bins is implemented by the method Histogram.>>

1
2
3
4
private def -- : Unit = 
  values = (0 until values.size-1 by 2)
     ./:(new ArrayBuffer[Bin])( (ab, n) => 
 ab += (values(n) + values(n+1)))

Testing
Finally, the predictive method is used to compute the accuracy of the model, through the validate method. The histogram class is tested by loading a data set from file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def validate(
    hist: Histogram, 
    fileName: String, 
    eps: Double): Try[Double] = Try {
  val src = Source.fromFile(fileName)
  val fields = src.getLines.map( _.split(DELIM))
  
  val counts = fields./:((0L, 0L))((s, xy) => 
    if( Math.abs(hist.predict(xy(0).trim.toFloat)-xy(1).toFloat) < eps) 
       (s._1 + 1L, s._2 + 1L) 
    else 
       (s._1, s._2 + 1L)
  )
  
  val accuracy = counts._1.toDouble/counts._2
  src.close
  accuracy
}



Reference
Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
Scala Cookbook A. Alexander O'Reilly 2013
Histogram-based approximation using Apache Spark
Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004