## Monday, July 4, 2016

### Monte Carlo Integration in Scala

This post introduces an overlooked numerical integration method leveraging the ubiquituous Monte Carlo simulation.
Not every function has a closed form for computing a definite integral known as symbolic integration. There are many numerical integration method for continuous functions such as Simpson's formula, Newton-Cotes quadrature rule and Gaussian quadrature. These methods are deterministic by nature.
The Monte-Carlo numerical integration is a stochastic method that relies on randomly generated values to estimate the area under a curve, surface or any multidimensional space. The illustration of the Monte Carlo integration method uses a pragmatic uniform random distribution of data points for single variable function.

Basic concept
Let's consider the single variable function illustrated in the following line plot.
$f(x)=\frac{1}{x}-0.5$

The objective is to compute the integral of the function over the interval [1, 3]. The Monte Carlo numerical integration is defined by three steps:
• Define the outer area of the function defined by the minimum and maximum values of x and y, in this case x [1, 3] and y [0.5, -0.12]
• Generate random data points between over the outer area
• Compute the ratio of the number of random data points within the function area over the total number of data points
Let's define a generic class, MonteCarloIntegrator to encapsulate these 3 steps:
The class has two arguments (line 1)
• The function f to sum
• The number of random data points used in the methodology

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class MonteCarloIntegrator(f: Double => Double, numPoints: Int) { def integrate(from: Double, to: Double): Double = { val (min, max) = getBounds(from, to) val width = to - from val height = if (min >= 0.0) max else max - min val outerArea = width * height val randomx = new Random(System.currentTimeMillis) val randomy = new Random(System.currentTimeMillis + 42L) def randomSquare(randomx: Random, randomy: Random): Double = { val numInsideArea = Range(0, numPoints)./:(0)( (s, n) => { val ptx = randomx.nextDouble * width + from val pty = randomy.nextDouble * height randomx.setSeed(randomy.nextLong) randomy.setSeed(randomx.nextLong) s + (if (pty > 0.0 && pty < f(ptx)) 1 else if (pty < 0.0 && pty > f(ptx)) -1 else 0) } ) numInsideArea.toDouble * outerArea / numPoints } randomSquare(randomx, randomy) } } 

The method integrate implements the sum of the function over the interval [from, to] (line 3). The first step is to extract the bounds getBounds of the outer area (line 4) which size is computed on line 7. Each coordinate is assigned a random generator randomx and randomy (lines 8 & 9).
The nested method randomSquare records the number of data points, numInsideArea that falls into the area delimited by the function (line 13 - 21). The sum is computed as the relative number of data points inside the area delimited by the function (line 24).

The method getBounds is described in the following code snippet. It is a simple, although not particularly efficient approach to extract the boundary of the integration. It breaks down the interval into of steps (lines 2 & 3) and collects the minimum and maximum values of the function (lines 7 - 12).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def getBounds(from: Double, to: Double): (Double, Double) = { val numSteps = Math.sqrt(numPoints).floor.toInt val stepSize = (to - from) / numSteps (0 to numSteps)./:((Double.MaxValue, -Double.MaxValue))( (minMax, n) => { val y = f(n * stepSize + from) updateBounds(y, minMax) match { case 0x01 => (y, minMax._2) case 0x02 => (minMax._1, y) case 0x03 => (y, y) case _ => minMax } } ) } def updateBounds(y: Double, minMax: (Double,Double)): Int = { var flag = 0x00 if (y < minMax._1) flag += 0x01 if (y > minMax._2) flag += 0x02 flag } 

Precision, precision
You may wonder about the accuracy of the Monte Carlo method and how many randomly generated data points are needed for a decent accuracy. Let's consider the same function $f(x)=\frac{1}{x}-0.5$ and its indefinite integral, that is used to generated the expected sum for the function f $\int f(x)dx=log(x)-\frac{1}{2x}+C$ The simple test consists of computing the error between the value produced by the definite integral and the sum from the Monte Carlo method as implemented in the following code snippet.

 1 2 3 4 5 6 7 8 val fct =(x: Double) => 2 * x - 1.0 val integral = (x: Double, c: Double) => x*x - x + c) final val numPoints=10000 val integrator = new MonteCarloIntegrator(fct, numPoints) val predicted = monteCarloIntegrator.integrate(1.0, 2.0) val expected = integral(2.0, 0.0) - integral(1.0, 0.0) 

We run the test 100 times for number of random data points varying between 100 and 50,000.

Monte Carlo is reasonably accurate even with a small number of data points. In this case, the small increase in accuracy does not justify the need to randomize a significant number of data points beyond 1000 data points.

A smarter approach would be to rely on a exit condition to complete the summation in the shortest time possible without having to estimate the optimum number of random data points. A new batch of numPoints rand data points is added at each iteration. One simple convergence criteria compares the difference of the sum between two consecutive iterations:
  sum(existing data points + new batch data points) - sum(existing data points) < eps

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 def randomSquare(randomx: Random, randomy: Random): Double = { var oldValue = 0.0 Range(0, numPoints).takeWhile(_ => { val numInsideArea = .... // ... s + .... }) val newValue = numInsideArea.toDouble * outerArea / numPoints val diff = Math.abs(newValue - oldValue) oldValue = newValue diff < eps }) oldValue } 

At each iteration, a new batch of numPoints data points is randomly generated to enhance the accuracy of the summation. The exit strategy is implemented through the higher order Scala collection method takeWhile (lines 3 & 11).

Note: This implementation of the Monte Carlo integration is simple enough to illustrate the concept of stochastic methods applied to calculus. The recursive stratified sampling has been shown to be more accurate for computing definite integral of function with significant inflection points (extreme second order derivative).

References
Monte Carlo Integration Dartmouth College - C. Robert, G. Casella
The Clever Machine: Monte Carlo ApproximationsDustin Stansbury
Programming in Scala - 3rd edition M. Odersky, L. Spoon, B. Venners

## Sunday, June 5, 2016

The stochastic gradient descent (SGD) is a critical element in the training of machine learning models such as support vector machines, logistic regression or back-propagation neural networks. In its simplest incarnation, the gradient is computed using a single learning rate.
However, it is not uncommon for the features of a model to have a wide range of variance between observations. In this case an adaptive gradient algorithm, which assign a learning rate for each feature, may be the solution. There are many different approaches to implement per-feature learning rate: this post describes the AdaGrad algorithm and its implementation in Apache Spark 2.0

The stochastic gradient descent is a randomized approximation of the gradient descent (batch) algorithm to minimize a continuous differentiable objective function. In supervised machine learning, the objective function is a loss function (logistic, sum of squares..).
$L(w)=\frac{1}{n}\sum_{i=0}^{n}(y_{i}-f(x_{i}|w))^{2}$ The objective function L is expressed as the summation of differentiable functions. In the case of the loss function in supervised learning, the differentiable functions are the loss related to a specific feature. $L(w)=\sum_{i=1}^{n}L_{i}(w)$ In supervised learning, the vector w represent the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula $w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}$ The stochastic gradient descent (SGD) minimizes the loss function between the expected value and the predictive values generated by the model. At each iteration, SGD, select a subset of the observations (known as a mini-batch) used in the training of the model. The iterative process is expected to converged toward the true global minimum of the loss function.

The main reason behind AdaGrad is the need to increase the learning rate for the sparse features (or model parameters) and decrease the learning rate for features that are denser. Therefore, AdaGrad improves the convergence of the minimization of the loss for model with sparse features, given that these sparse features retains information.
$w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}$

SGD in Apache Spark
The Apache spark MLlib library has two implementations of SGD
• Generic Gradient Descent and related classes in the mllib.optimization package
• SGD bundled with classifier or regression algorithms such as LogisticRegressionWithSGD, LassoWithSGD, SVMWithSGD or RidgeRegressionWithSGD
We will be using the optimization package in order to customize the stochastic gradient descent. The objective is to leverage the mllib.optimization.GradientDescent template class and implement the adaptive gradient with per-feature learning rate by creating a customize Updater.
The updater "updates the weights of the model" (Logistic regression or SVM) with the product of the current learning rate with the partial derivative of the loss over this weight (as described in the previous section). Let's call AdaGradUpdater the updater that implement the update of the model weights using the adaptive gradient. The SGD is then instantiated as follow
   val adaSGD = new GradientDescent.
.setStepSize(0.01)
. .....

  Updater.compute(
oldWeights: Vector,
stepSize: Double,
iter: Int,
regCoefs: Double): (Vector, Double)

The method returns the tuple (vector of new weights, loss)

As mentioned earlier, the implementation of AdaGrad consists of overriding the method Updater.compute
The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array gradientHistory (line 3). First we define the method += to update the gradient history (lines 27-36). The first call to the method creates the gradient history (line 31).
The next step consists of converting the existing (old) weights into a Breeze dense vector brzWeights (line 14). The array of the new learning rates is computed as the inverseVelocity coefficient (line 39).
The learning rates are zipped with the old weights (line 15) to update the weights newWeights as a new dense vector(line 21). The linear algebra (matricial computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to brzAxpy (line 21) and brzNorm (line 22).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @DeveloperApi final class AdaGradL2Updater(dimension: Int) extends Updater { private[this] var gradientsHistory: Array[Double] = _ override def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double ): (Vector, Double) = { +=(gradient) val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray) val newWeights: BV[Double] = new DenseVector[Double](sumSquareDerivative.view).map { case (coef, weight) => weight * (1.0 -regParam * coef) }) brzAxpy(-1.0, gradient.toBreeze, newWeights) val norm = brzNorm(brzWeights, 2.0) (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) } private def +=(gradient: Vector): Unit = { val grad = gradient.toArray grad.view.zipWithIndex.foreach { case (g, index) => { if (gradientsHistory == null) gradientsHistory = Array.fill(grad.length)(0.0) val existingGradient = gradientsHistory(index) gradientsHistory.update(index, existingGradient + g*g) } } } def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_)) } 

References

## Thursday, May 5, 2016

### Bootstrapping by resampling with replacement

Bootstrapping is a statistical resampling method that consists of randomly sampling a dataset with replacement. This technique enables data scientists to estimate the sampling distribution of a wide variety of probability distributions.

Background
One key objective of bootstrapping is to compute the accuracy of any statistic such as mean, standard deviation, median, mode or error. These statistics, s are known as estimators of an approximate distribution. The most common approximate distribution is known as the empirical distribution function. The special case of the observations are independent and evenly distributed, the empirical or approximate distribution can be estimated through resampling.
The following diagram captures the essence of bootstrapping by resampling.

Each of the B bootstrap samples has the same number of observations or data points as the original dataset from which the samples are created. Once the samples are created, a statistical function s such as mean, mode, median or standard deviation is computed for each sample.
The standard deviation for the B statistics should be similar to the standard deviation of the original dataset.

Implementation in Scala
The purpose of this post is to illustrate some basic properties of bootstrapped sampling
• Profile of the distribution of statistics s for a given probability distribution
• Comparison of the standard deviation of the statistics s with the standard deviation of the original dataset

Let's implement a bootstrap by resampling in Scala, starting with a class Bootstrap.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Bootstrap( numSamples: Int = 1000, s: Vector[Double] => Double, inputDistribution: Vector[Double], randomizer: Random ) { lazy val bootstrappedReplicates: Array[Double] = ( 0 until numSamples )./:( new mutable.ArrayBuffer[Double] )( ( buf, _ ) => buf += createBootstrapSample ).toArray def createBootstrapSample: Double {} lazy val mean = bootstrappedReplicates.reduce( _ + _ )/numSamples def error: Double = {} } 

The class Bootstrap is instantiated with a predefine number of samples, numSamples (line 2), a statistic function s (line 3), a dataset generated by a given distribution inputDistribution (line 4) and a randomizer (line 5).
The computation of the bootstrap replicates, bootstrappedReplicates is central to resampling (lines 8 - 11). As described in the introduction, a replicate, s is computed from a sample of the original data set with the method createBootstrapSample (line 10).

Let's implement the method createBootstrapSample.

  1 2 3 4 5 6 7 8 9 10 def createBootstrapSample: Double = s( ( 0 until inputDistribution.size )./:( new mutable.ArrayBuffer[Double] )( ( buf, _ ) => { randomizer.setSeed( randomizer.nextLong ) val randomValueIndex = randomizer.nextInt( inputDistribution.size ) buf += inputDistribution( randomValueIndex ) } ).toVector ) 

The method createBootstrapSample
- Samples the original dataset using a uniform random function (line 6)
- Applies the statistic function s to this sample dataset (line 1 & 11)

The last step consists of computing the error (deviation) on the bootstrap replicates

 1 2 3 4 5 6 7 8  def error: Double = { val sumOfSquaredDiff = bootstrappedReplicates.reduce( (s1: Double, s2: Double) => (s1 - mean) (s1 - mean) + (s2 - mean)*(s2 - mean) ) Math.sqrt(sumOfSquaredDiff / (numSamples - 1)) } 

Evaluation
The first evaluation consists of comparing the distribution of replicates with the original distribution. To this purpose, we generate an input dataset using
• Normal distribution
• LogNormal distribution
Let's create a method, bootstrapEvaluation to compare the distribution of the bootstrap replicates with the dataset from which the bootstrap samples are generated.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def bootstrapEvaluation( dist: RealDistribution, random: Random, gen: (Double, Double) ): (Double, Double) = { val inputDistribution = (0 until 5000)./:(new ArrayBuffer[(Double, Double)]) ( ( buf, _ ) => { val x = gen._1 * random.nextDouble - gen._2 buf += ( ( x, dist.density( x ) ) ) } ).toVector val mean = (x: Vector[Double]) => x.sum/x.length val bootstrap = new Bootstrap( numReplicates, mean, inputDistribution.map( _._2 ), new Random( System.currentTimeMillis) ) val meanS = bootstrap.bootstrappedReplicates.sum / bootstrap.bootstrappedReplicates.size val sProb = bootstrap.bootstrappedReplicates.map(_ - meanS) // .. plotting histogram of distribution sProb (bootstrap.mean, bootstrap.error) } 

We are using the normal and log normal probability density function defined in the Apache Commons Math Java library. These probability density functions are defined in the org.apache.commons.math3.distribution package.
The comparative method bootstrapEvaluation has the following argument:
• dist: A probability density function used to generate the dataset upon which sampling is performed (line 2).
• random: A random number generator (line 3)
• gen: A pair of parameters for the linear transform for the generation of random values (a.r + b) (line 4).
The input distribution inputDistribution { (x, pdf(x)} is generated for 5000 data points (lines 7 - 13).
Next the bootstrap is created with the appropriate number of replicates, numReplicates, the mean of the input dataset as the statistical function s, the input distribution and the generic random number generator of Scala library, as arguments (lines 16 -20).
Let's plot the distribution the input dataset generated from a normal density function.

val (meanNormal, errorNormal) = bootstrap(
new NormalDistribution,
new scala.util.Random,
(5.0, 2.5)
)


Normally distributed dataset
The first graph plots the distribution of the input dataset using the Normal distribution.

The second graph illustrates the distribution (histogram) of the replicates s - mean.

The bootstrap replicates s(x) are also normally distributed. The mean value for the bootstrap replicates is 0.1978 and the error is 0.001691

Dataset with a log normal distribution
We repeat the same process for the lognormal distribution. This time around the dataset to sample from follows a log-normal distribution.

val (meanLogNormal, errorLogNormal) = bootstrap(
new LogNormalDistribution,
new scala.util.Random,
(2.0, 0.0)
)


Although the original dataset used for generated the bootstrap samples is normally distribured, the bootstrap replicates s(x) are normally distributed. The mean for the bootstrap replicates is 0.3801 and the error is 0.002937

The error for the bootstrap resampling from a log normal distribution is twice as much as the error related to the normal distribution
The results is not be a surprise: The bootstrap replicates follow a Normal distribution which matches closely the original dataset created using the same probability density function.

References
Programming in Scala - 3rd edition M Odersky, L. Spoon, B. Venners - Artima - 2016
Elements of Statistics Learning: Data mining, Inference and Prediction - 7.11 Bootstrap method Springer - 2001

## Thursday, April 28, 2016

### Normalized Discounted Cumulative Gain in Scala

This post illustrates the Normalized Discounted Cumulative Gain (NDCG) and it implementation in Scala.
Numerous real-life applications of machine learning require the prediction the most relevant ranking of items to optimize an outcome. For instance

• Evaluate and prioritize counter-measures to cyber-attach
• Ranks symptoms in a clinical trial
• Extract documents relevant to a given topic from a corpus
The Discounted Cumulative Gain (DCG) and its normalized counter part, Normalized Discounted Cumulative Gain (NDCG) is a metric original applied in textual information retrieval and extended to other domains.

Discounted Cumulative Gain
Let's dive into the mathematical formalism for the Discounted Cumulative Gain.

For a indexed target values tj as illustrated in the diagram above, the discounted cumulative gain is computed as
$DCG=\sum_{j=1}^{n}\frac{2^{t_{j}}-1}{log_{2}(j+1)}$ The objective is to compare any given list of ranked/sorted item with a benchmark which represent the optimum ranking (ranking with the highest DCG value).
$p(ranking|IDCG)=\frac{log(2)}{IDCG}\sum_{j=0}^{n}\frac{2^{t_{j}}-1}{log(j+1)}$

Scala implementation
The implementation of the computation of NDCG in Scala is quite simple, indeed. Given a ranked list of items. The three steps are
• Compute the IDCG (or normalization factor)from the list
• Compute the DCG for the list
• Compute the NDCG = DCG/IDCF
First let's consider list of items, of type T to rank. The method ranking to sort a sample of sorted items is provided as an implicit function. The constructor for NDCG has a single argument: the sample of ranking:
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class NDCG[T]( firstSample: Seq[T]) (implicit ranking: T => Int) { import NDCG._ val iDCG: Double = normalize def score: Double = score(initialSample) private def dcg(samples: Seq[T]): Double = samples.zipWithIndex.aggregate(0.0)( (s, samplej) => s + compute(samplej._2 + 1, ranking(samplej._1)) , _ + _) private def normalize: Double = { val sorted = initialSample.zipWithIndex.sortBy{ case (sample, n) => -ranking(sample) }.map( _._1) dcg(sorted) } } 

The Ideal Discounted Cumulative Gain, iDCG is compute through the normalize method (line 6). iDCG (normalization factor) is computed by first sorting the items of type T by their value in decreasing order (line 16), then scoring this re-sorted list using the dcg method (line 17).
The computation of the Discounted Cumulative Gain by the method dcg (line 10) is a direct application of the formula described in the previous chapter.
Note: The logarithm function uses a base 2. It is computed as natural log(x)/natural log (2)
Let's now consider a list of items of type Item defined as follows:
case class Item(id: String, x: Double, rank: Int)


The list of items, itemsList is implicitly ranked through the last attribute, rank.

val itemsList = Seq[Item](
Item("1", 3.4, 4), Item("2", 1.4, 3),
Item("3", -0.7, 5), Item("4", 5.2, 2),
Item("5", 1.4, 1))

implicit val ranking = (item: Item) => item.rank


It is time to compute the NDCG coefficient for the list of items, by invoking the score method.

val nDCG = new NDCG[Item](itemsList)

println(s"IDCG = ${nDCG.iDCG}") //45.64 println(s"Score =${nDCG.score}")  // 0.801


The ideal discounted cumulative gain, iDCG is 45.6: It is the optimum ranking for this list of time. The first sample score a probability of 0.8
Note
The DCG of subsequent samples can be computed using the same iDCG value from the same instance of NDCG.

def score(samples: Seq[T]): Double =
if( samples.size != initialSample.size) 0.0
else dcg(samples)/iDCG


References

## Friday, April 15, 2016

### Managing Spark context in ScalaTest

This post describes a methodology to manage the Spark context while testing you application using ScalaTest

Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
• Specify your Spark context configuration SparkConf
• Create the Spark context
• Clean up resources (Spark context, Akka context, File handles...
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context lifecyle of your test application.

Wrapping the Spark context
The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method.
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 trait SparkContextWrapper { protected[this] var sc: SparkContext = _ def getContext: SparkContext = { val conf = new SparkConf().setAppName(s"Test-App") .set("spark.executor.instances", "2") .set("spark.driver.memory", "2g") .set("spark.executor.memory", "2g") .set("spark.executor.cores", "2") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.rdd.compress", "true") .set("spark.shuffle.spill", "true") .set("spark.shuffle.spill.compress", "true") .set("spark.shuffle.memoryFraction", "0.4") .set("spark.io.compression.codec", "snappy") .set("spark.network.timeout", "600") sc = SparkContext.getOrCreate(conf.setMaster("local[4]")) sc.setLogLevel("ERROR") sc } def close: Unit = { if (sc != null) sc.stop } } 

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing.
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 trait SparkContextForTesting extends SparkContextWrapper with BeforeAndAfterAll { self: Suite => override def beforeAll: Unit = { getContext super.beforeAll } override def afterAll: Unit = { close super.afterAll } } 

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MyTest extends FlatSpec with Matchers with SparkContextForTesting { val x: Int = -4 it should "My first test" in { val sqlContext = new SQLContext(sc) // .... } // ... other tests it should "My last test" in { // clean up after it completes } } 

As they say at the end of the movie, "That's all folks!"

Reference
ScalaTest User Guide