Friday, November 18, 2016

Recursive Mean & Standard Deviation in Scala

Implementation of the recursive computation of the mean and standard deviation of a very large data set in Scala using tail recursion

Overview
The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
$\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}$
The second formula allows to update the mean and standard deviation with any new data point (online computation).
$\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}$
This second approach relies on the computation the sum of square values that can overflow

 1 2 3 4 val x = Array[Double]( /* ... */ ) val mean = x.sum/x.length val stdDev = Math.sqrt((x.map( _ - mean) .map(t => t*t).sum)/x.length) 

A reduceLeft can be used as an alternative of map{ ... }.sum for the computation of the standard deviation (line 3).

Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
$\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}$ The recursive formula for the standard deviation is
$\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}$
Let's implement these two recursive formula in Scala using the tail recursion (line 4).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def meanStd(x: Array[Double]): (Double, Double) ={ @scala.annotation.tailrec def meanStd( x: Array[Double], mu: Double, Q: Double, count: Int): (Double, Double) = if (count >= x.length) (mu, Math.sqrt(Q/x.length)) else { val newCount = count +1 val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount) val newQ = Q + (x(count) - mu)*(x(count) - newMu) meanStd(x, newMu, newQ, newCount) } meanStd(x, 0.0, 0.0, 0) } 

This implementation update the mean and the standard deviation for each new data point simultaneously. The recursion exits when all elements have been accounted for (line 9).

Monday, August 8, 2016

Spark ML pipelines I - Features encoding

Apache spark introduced machine learning (ML) pipeline in version 1.4.0. A pipeline is actually a workflow or sequence of tasks that cleanse, filter, train, classify, predict and validate data set. Those tasks are defined as stage of the pipeline.
Spark 2.0 extends ML pipelines to support persistency, while the MLlib package is slowly deprecated in favor of the data frame and data set based ML library.
ML pipeline allows data scientist to weave transformers and estimators into a single monotonic classification and predictive models.

This first post related to ML pipeline implement a feature encoding pipeline

ML Pipelines 101
This section is a very brief overview of ML pipelines. Further information is available at
The key ingredient of a ML pipeline are

• Transformers are algorithms which can transform one DataFrame into another DataFrame. Transformers are stateless
• Estimators are algorithm which can be fit on a DataFrame to produce a Transformer (i.e. Estimator.fit)
• Pipelines are estimators that weave or chain multiple Transformers and Estimators together to specify an ML workflow
• Pipeline stages are the basic element of the ML workflow. Transformers, estimators and pipelines are pipeline stages
• Parameters encapsulate the tuning and configuration parameters requires for each Transformers and Estimators.
Note: The following implementation has been tested using Apache Spark 2.0

Features encoding pipeline
Categorical features have multiple values or instances which are represented as a string. Numerical value as categorized or bucketed through a conversion to a string.
Once converted to a string, the categorical values are converted into category indices using the StringIndex transformer. The resulting sequence of indices is ranked by decreasing order of frequency of the value in the training or validation set.
Next, the indices associated to particular features are encoded into a vector of binary value (0, 1) through the OneHotEncoder transformer. A feature instance is encoded as 1 if is defined in the data point, 0 otherwise.
Finally, the vector of binary values of all the feature are aggregated or assembled through the transformer VectorAssembler
Let's consider the following simple use case: A sales report list the date an item is sold, its id, the sales region and name of the sales person or agent.
   date         id         region    agent
---------------------------------------
07/10/2014   23c9a89d   17        aa4

The encoding pipeline is illustrated in the following diagram: The data source is a CSV sales report file. Each column or feature is converted to a string index, then encoded as a vector of binary values. Finally, the 4 vectors are assembled into a single features vector.

Implementation
The first step is to create a Spark 2.x session. Let's wrap the spark session configuration into a single, reusable trait, SparkSessionManager

trait SparkSessionManager {
protected[this] val sparkSession = SparkSession.builder()
.appName("LR-Pipeline")
.config(new SparkConf()
.set("spark.default.parallelism", "4")
.set("spark.rdd.compress", "true")
.set("spark.executor.memory", "8g")
.set("spark.shuffle.spill", "true")
.set("spark.shuffle.spill.compress", "true")
.set("spark.io.compression.codec", "lzf")
.setMaster("local")).getOrCreate()

protected def csv2DF(dataFile: String): DataFrame =
.option("inferSchema", true)
.csv(dataFile)

def stop: Unit = sparkSession.stop
}


The SparkSession is instantiated with the same configuration SparkConf as the SparkContext used in Spark 0.x and 1.x versions.
The method csv2DF loads the content of CSV file and generate a data frame or data set.

The next step consists of creating the encoding workflow or pipeline, wrapped into the DataEncoding

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 trait DataEncoding { protected[this] val colNames: Array[String] lazy val vecColNames = colNames.map(vector(_)) lazy val pipelineStages: Array[PipelineStage] = colNames.map(colName => new StringIndexer() .setInputCol(colName) .setOutputCol(index(colName))) ++ colNames.map(colName => new OneHotEncoder() .setInputCol(index(colName)) .setOutputCol(vector(colName))) ++ Array[PipelineStage](new VectorAssembler() .setInputCols(vecColNames).setOutputCol("features")) def index(colName: String): String = s"${colName}Index" def vector(colName: String): String = s"${colName}Vector" } 

The sequence of names of the columns (features) colNames is an abstract value that needs to be defined for the specific training set (line 2). The first two stages of the data encoding pipeline, String indexing (line 6-8) and encoding (line 9-11) are applied to each of the 4 features. The last stage, assembling the features vector (line 12, 13) is added to the array of the previous stages to complete the pipeline. Each stage is defined by its input column setInputCol and output column/feature setOutputCol

Let's leverage the Spark session and the data encoding to generate a classification model, ModelFactory. Classification and predictive models are built using ML pipelines as described in a future post. For now let's create a pipeline model using the data encoding stages

val dataEncoder = new DataEncoding {
override protected[this] val colNames: Array[String]=
Array[String]("date", "id", "region", "agent")

def pipelineModel(df: DataFrame): PipelineModel =
new Pipeline().setStages(pipelineStages).fit(df)
}



The pipeline model is generated in two steps:
• Instantiate the pipeline from the date encoding stages
• Generate the model from a input or training data frame, df by invoking the fit method.
The next post extends the data encoding pipeline with two estimators: a classifier and a cross validator.

References
Introduction to ML pipelines and MLlib
ML StringIndexer transformer
ML OneHotEncoder transformer
ML VectorAssember transformer

Monday, July 4, 2016

Monte Carlo Integration in Scala

This post introduces an overlooked numerical integration method leveraging the ubiquituous Monte Carlo simulation.
Not every function has a closed form for computing a definite integral known as symbolic integration. There are many numerical integration method for continuous functions such as Simpson's formula, Newton-Cotes quadrature rule and Gaussian quadrature. These methods are deterministic by nature.
The Monte-Carlo numerical integration is a stochastic method that relies on randomly generated values to estimate the area under a curve, surface or any multidimensional space. The illustration of the Monte Carlo integration method uses a pragmatic uniform random distribution of data points for single variable function.

Basic concept
Let's consider the single variable function illustrated in the following line plot.
$f(x)=\frac{1}{x}-0.5$
The objective is to compute the integral of the function over the interval [1, 3]. The Monte Carlo numerical integration is defined by three steps:
• Define the outer area of the function defined by the minimum and maximum values of x and y, in this case x [1, 3] and y [0.5, -0.12]
• Generate random data points between over the outer area
• Compute the ratio of the number of random data points within the function area over the total number of data points
Let's define a generic class, MonteCarloIntegrator to encapsulate these 3 steps:
The class has two arguments (line 1)
• The function f to sum
• The number of random data points used in the methodology

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class MonteCarloIntegrator(f: Double => Double, numPoints: Int) { def integrate(from: Double, to: Double): Double = { val (min, max) = getBounds(from, to) val width = to - from val height = if (min >= 0.0) max else max - min val outerArea = width * height val randomx = new Random(System.currentTimeMillis) val randomy = new Random(System.currentTimeMillis + 42L) def randomSquare(randomx: Random, randomy: Random): Double = { val numInsideArea = Range(0, numPoints)./:(0)( (s, n) => { val ptx = randomx.nextDouble * width + from val pty = randomy.nextDouble * height randomx.setSeed(randomy.nextLong) randomy.setSeed(randomx.nextLong) s + (if (pty > 0.0 && pty < f(ptx)) 1 else if (pty < 0.0 && pty > f(ptx)) -1 else 0) } ) numInsideArea.toDouble * outerArea / numPoints } randomSquare(randomx, randomy) } } 

The method integrate implements the sum of the function over the interval [from, to] (line 3). The first step is to extract the bounds getBounds of the outer area (line 4) which size is computed on line 7. Each coordinate is assigned a random generator randomx and randomy (lines 8 & 9).
The nested method randomSquare records the number of data points, numInsideArea that falls into the area delimited by the function (line 13 - 21). The sum is computed as the relative number of data points inside the area delimited by the function (line 24).

The method getBounds is described in the following code snippet. It is a simple, although not particularly efficient approach to extract the boundary of the integration. It breaks down the interval into of steps (lines 2 & 3) and collects the minimum and maximum values of the function (lines 7 - 12).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def getBounds(from: Double, to: Double): (Double, Double) = { val numSteps = Math.sqrt(numPoints).floor.toInt val stepSize = (to - from) / numSteps (0 to numSteps)./:((Double.MaxValue, -Double.MaxValue))( (minMax, n) => { val y = f(n * stepSize + from) updateBounds(y, minMax) match { case 0x01 => (y, minMax._2) case 0x02 => (minMax._1, y) case 0x03 => (y, y) case _ => minMax } } ) } def updateBounds(y: Double, minMax: (Double,Double)): Int = { var flag = 0x00 if (y < minMax._1) flag += 0x01 if (y > minMax._2) flag += 0x02 flag } 

Precision, precision
You may wonder about the accuracy of the Monte Carlo method and how many randomly generated data points are needed for a decent accuracy. Let's consider the same function $f(x)=\frac{1}{x}-0.5$ and its indefinite integral, that is used to generated the expected sum for the function f $\int f(x)dx=log(x)-\frac{1}{2x}+C$ The simple test consists of computing the error between the value produced by the definite integral and the sum from the Monte Carlo method as implemented in the following code snippet.

 1 2 3 4 5 6 7 8 val fct =(x: Double) => 2 * x - 1.0 val integral = (x: Double, c: Double) => x*x - x + c) final val numPoints=10000 val integrator = new MonteCarloIntegrator(fct, numPoints) val predicted = monteCarloIntegrator.integrate(1.0, 2.0) val expected = integral(2.0, 0.0) - integral(1.0, 0.0) 

We run the test 100 times for number of random data points varying between 100 and 50,000.

Monte Carlo is reasonably accurate even with a small number of data points. In this case, the small increase in accuracy does not justify the need to randomize a significant number of data points beyond 1000 data points.

A smarter approach would be to rely on a exit condition to complete the summation in the shortest time possible without having to estimate the optimum number of random data points. A new batch of numPoints rand data points is added at each iteration. One simple convergence criteria compares the difference of the sum between two consecutive iterations:
  sum(existing data points + new batch data points) - sum(existing data points) < eps

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 def randomSquare(randomx: Random, randomy: Random): Double = { var oldValue = 0.0 Range(0, numPoints).takeWhile(_ => { val numInsideArea = .... // ... s + .... }) val newValue = numInsideArea.toDouble * outerArea / numPoints val diff = Math.abs(newValue - oldValue) oldValue = newValue diff < eps }) oldValue } 

At each iteration, a new batch of numPoints data points is randomly generated to enhance the accuracy of the summation. The exit strategy is implemented through the higher order Scala collection method takeWhile (lines 3 & 11).

Note: This implementation of the Monte Carlo integration is simple enough to illustrate the concept of stochastic methods applied to calculus. The recursive stratified sampling has been shown to be more accurate for computing definite integral of function with significant inflection points (extreme second order derivative).

References
Monte Carlo Integration Dartmouth College - C. Robert, G. Casella
The Clever Machine: Monte Carlo ApproximationsDustin Stansbury
Programming in Scala - 3rd edition M. Odersky, L. Spoon, B. Venners

Thursday, May 5, 2016

Bootstrapping by resampling with replacement

Bootstrapping is a statistical resampling method that consists of randomly sampling a dataset with replacement. This technique enables data scientists to estimate the sampling distribution of a wide variety of probability distributions.

Background
One key objective of bootstrapping is to compute the accuracy of any statistic such as mean, standard deviation, median, mode or error. These statistics, s are known as estimators of an approximate distribution. The most common approximate distribution is known as the empirical distribution function. In the case, the observations are independent and evenly distributed (iid), the empirical or approximate distribution can be estimated through resampling.
The following diagram captures the essence of bootstrapping by resampling. Generation of bootstrap replicates by resampling

Each of the B bootstrap samples has the same number of observations or data points as the original dataset from which the samples are created. Once the samples are created, a statistical function s such as mean, mode, median or standard deviation is computed for each sample.
The standard deviation for the B statistics should be similar to the standard deviation of the original dataset.

Implementation in Scala
The purpose of this post is to illustrate some basic properties of bootstrapped sampling
• Profile of the distribution of statistics s for a given probability distribution
• Comparison of the standard deviation of the statistics s with the standard deviation of the original dataset

Let's implement a bootstrap by resampling in Scala, starting with a class Bootstrap.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Bootstrap( numSamples: Int = 1000, s: Vector[Double] => Double, inputDistribution: Vector[Double], randomizer: Random ) { lazy val bootstrappedReplicates: Array[Double] = ( 0 until numSamples )./:( new mutable.ArrayBuffer[Double] )( ( buf, _ ) => buf += createBootstrapSample ).toArray def createBootstrapSample: Double {} lazy val mean = bootstrappedReplicates.reduce( _ + _ )/numSamples def error: Double = {} } 

The class Bootstrap is instantiated with a predefine number of samples, numSamples (line 2), a statistic function s (line 3), a dataset generated by a given distribution inputDistribution (line 4) and a randomizer (line 5).
The computation of the bootstrap replicates, bootstrappedReplicates is central to resampling (lines 8 - 11). As described in the introduction, a replicate, s is computed from a sample of the original data set with the method createBootstrapSample (line 10).

Let's implement the method createBootstrapSample.

  1 2 3 4 5 6 7 8 9 10 def createBootstrapSample: Double = s( ( 0 until inputDistribution.size )./:( new mutable.ArrayBuffer[Double] )( ( buf, _ ) => { randomizer.setSeed( randomizer.nextLong ) val randomValueIndex = randomizer.nextInt( inputDistribution.size ) buf += inputDistribution( randomValueIndex ) } ).toVector ) 

The method createBootstrapSample
- Samples the original dataset using a uniform random function (line 6)
- Applies the statistic function s to this sample dataset (line 1 & 11)

The last step consists of computing the error (deviation) on the bootstrap replicates

 1 2 3 4 5 6 7 8  def error: Double = { val sumOfSquaredDiff = bootstrappedReplicates.reduce( (s1: Double, s2: Double) => (s1 - mean) (s1 - mean) + (s2 - mean)*(s2 - mean) ) Math.sqrt(sumOfSquaredDiff / (numSamples - 1)) } 

Evaluation
The first evaluation consists of comparing the distribution of replicates with the original distribution. To this purpose, we generate an input dataset using
• Normal distribution
• LogNormal distribution
Let's create a method, bootstrapEvaluation to compare the distribution of the bootstrap replicates with the dataset from which the bootstrap samples are generated.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def bootstrapEvaluation( dist: RealDistribution, random: Random, gen: (Double, Double) ): (Double, Double) = { val inputDistribution = (0 until 5000)./:(new ArrayBuffer[(Double, Double)]) ( ( buf, _ ) => { val x = gen._1 * random.nextDouble - gen._2 buf += ( ( x, dist.density( x ) ) ) } ).toVector val mean = (x: Vector[Double]) => x.sum/x.length val bootstrap = new Bootstrap( numReplicates, mean, inputDistribution.map( _._2 ), new Random( System.currentTimeMillis) ) val meanS = bootstrap.bootstrappedReplicates.sum / bootstrap.bootstrappedReplicates.size val sProb = bootstrap.bootstrappedReplicates.map(_ - meanS) // .. plotting histogram of distribution sProb (bootstrap.mean, bootstrap.error) } 

We are using the normal and log normal probability density function defined in the Apache Commons Math Java library. These probability density functions are defined in the org.apache.commons.math3.distribution package.
The comparative method bootstrapEvaluation has the following argument:

• dist: A probability density function used to generate the dataset upon which sampling is performed (line 2).
• random: A random number generator (line 3)
• gen: A pair of parameters for the linear transform for the generation of random values (a.r + b) (line 4).
The input distribution inputDistribution { (x, pdf(x)} is generated for 5000 data points (lines 7 - 13).
Next the bootstrap is created with the appropriate number of replicates, numReplicates, the mean of the input dataset as the statistical function s, the input distribution and the generic random number generator of Scala library, as arguments (lines 16 -20).
Let's plot the distribution the input dataset generated from a normal density function.

val (meanNormal, errorNormal) = bootstrap(
new NormalDistribution,
new scala.util.Random,
(5.0, 2.5)
)


Normally distributed dataset
The first graph plots the distribution of the input dataset using the Normal distribution.

The second graph illustrates the distribution (histogram) of the replicates s - mean.

The bootstrap replicates s(x) are also normally distributed. The mean value for the bootstrap replicates is 0.1978 and the error is 0.001691

Dataset with a log normal distribution
We repeat the same process for the lognormal distribution. This time around the dataset to sample from follows a log-normal distribution.

val (meanLogNormal, errorLogNormal) = bootstrap(
new LogNormalDistribution,
new scala.util.Random,
(2.0, 0.0)
)


Although the original dataset used for generated the bootstrap samples is normally distribured, the bootstrap replicates s(x) are normally distributed. The mean for the bootstrap replicates is 0.3801 and the error is 0.002937

The error for the bootstrap resampling from a log normal distribution is twice as much as the error related to the normal distribution
The results is not be a surprise: The bootstrap replicates follow a Normal distribution which matches closely the original dataset created using the same probability density function.

References
Programming in Scala - 3rd edition M Odersky, L. Spoon, B. Venners - Artima - 2016
Elements of Statistics Learning: Data mining, Inference and Prediction - 7.11 Bootstrap method Springer - 2001

Friday, April 15, 2016

Managing Spark context in ScalaTest

This post describes a methodology to manage the Spark context while testing you application using ScalaTest

Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
• Specify your Spark context configuration SparkConf
• Create the Spark context
• Clean up resources (Spark context, Akka context, File handles...
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context lifecyle of your test application.

Wrapping the Spark context
The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method.
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 trait SparkContextWrapper { protected[this] var sc: SparkContext = _ def getContext: SparkContext = { val conf = new SparkConf().setAppName(s"Test-App") .set("spark.executor.instances", "2") .set("spark.driver.memory", "2g") .set("spark.executor.memory", "2g") .set("spark.executor.cores", "2") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.rdd.compress", "true") .set("spark.shuffle.spill", "true") .set("spark.shuffle.spill.compress", "true") .set("spark.shuffle.memoryFraction", "0.4") .set("spark.io.compression.codec", "snappy") .set("spark.network.timeout", "600") sc = SparkContext.getOrCreate(conf.setMaster("local")) sc.setLogLevel("ERROR") sc } def close: Unit = { if (sc != null) sc.stop } } 

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing.
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 trait SparkContextForTesting extends SparkContextWrapper with BeforeAndAfterAll { self: Suite => override def beforeAll: Unit = { getContext super.beforeAll } override def afterAll: Unit = { close super.afterAll } } 

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MyTest extends FlatSpec with Matchers with SparkContextForTesting { val x: Int = -4 it should "My first test" in { val sqlContext = new SQLContext(sc) // .... } // ... other tests it should "My last test" in { // clean up after it completes } } 

As they say at the end of the movie, "That's all folks!"

Reference
ScalaTest User Guide