Friday, November 18, 2016

Recursive Mean & Standard Deviation in Scala

Target audience: Intermediate
Estimated reading time: 10'

Implementation of the recursive computation of the mean and standard deviation of a very large data set in Scala using tail recursion


Overview
The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.


Direct computation
There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow

1
2
3
4
val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt((x.map( _ - mean)
    .map(t => t*t).sum)/x.length)

A reduceLeft can be used as an alternative of map{ ... }.sum for the computation of the standard deviation (line 3).


Recursive computation
There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\]
Let's implement these two recursive formula in Scala using the tail recursion (line 4).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def meanStd(x: Array[Double]): (Double, Double) ={
    
  @scala.annotation.tailrec
  def meanStd(
      x: Array[Double], 
      mu: Double, 
      Q: Double, 
      count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
    }
  
  meanStd(x, 0.0, 0.0, 0)
}

This implementation update the mean and the standard deviation for each new data point simultaneously. The recursion exits when all elements have been accounted for (line 9).


References
  • Programming in Scala - 3rd edition M. Odersky, L. Spoon, B. Venners

Sunday, September 18, 2016

Dependencies Injection in Scala

Target audience: Advanced
Estimated reading time: 20'


This post describes the concept of injection dependencies along with an implementation of the Cake pattern.


Overview
Dependency injection is a design pattern that has been widely used in Java, by leveraging frameworks such as Spring. The objective of the pattern is to replace hard-coded dependencies with run-time association or injection of new type.

Java defines modules or components through the semantics and convention of packages. The functionality of a module is defined through one or more interfaces and implemented through the composition and inheritance of concrete classes. Polymorphism is used to "dynamically wire" those classes, assembled through composition and inheritance into patterns which address a specific design problem (publish-subscribe, strategy, factory..).
However those capabilities have been proven limited for creating very dynamic and complex applications. The Scala programming language provides developers with a dependencies injection mechanism based on self type annotation and that does not rely on 3rd party framework.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 


Reuse through Inheritance
The simplest and commonly used form of reuse in any Object Oriented Programming is Inheritance. Let's consider an interface House which is implemented by an abstract or concrete class 'House with Furniture & Appliance" which in turn is sub-classed by a well defined House.

It is well documented that inheritance is a poor mechanism for code reuse because data is not properly encapsulated as a sub-class may access internals of the base class. Moreover any future changes in the base class of interface (Framework) will propagate through the sub-class (dependencies).


Reuse through Composition
It is a well documented and researched fact that composition provides a more robust encapsulation than inheritance as the main class delegates or routes method invocation to the appropriate internal components. Contrary to inheritance for which changes in the base class may have unintended consequences over the subclasses, changes in components or inner classes can be made independently of the main class or outer component.
Clearly in the example above, composition is more appropriate. After all a House with Furniture and Appliances can be defined as a House that contains 
Furniture and Appliance:



Inversion of Control
Framework such as Spring have introduced the concept of Inversion of Control Containers (IoC) and dependency injection which is a form of IoC. In case of inversion of control, a framework define interfaces which are extended or implemented by the application or client code. Instead of having the application using the Framework API, the framework relies on the application for implementing a specific function.

Let's take the example of a generic service,Service that access a database using Java.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public interface Service {
    JSonObject query(String mySQLQuery);
}
 
public interface DbAccess { }
 
public class ServiceImpl implements Service {
  private Dbaccess dbAccess = null;
  public void setDbaccess(Dbaccess dbAccess) { 
    this.dbAccess = dbAccess; 
  }
 
  @override
  public JSonObject query(String mySQLQuery) {}
}

In the example above, a concrete implementation of DbAccess interface such as mySQLQuery can be injected (as passed to the implementation of the service, line 9). The database access instance is used to enable the query and return a JSON object (line 14).
Scala provides the developer with a similar and powerful mechanism to inject dependencies to a concrete class, known as Cake pattern.


Dependency Injection
At its core, dependencies injection relies on 3 components:
- Consumer or list of dependent components
- Provider which injects the dependencies
- Dependencies

Let's consider the recipe example above. A House requires not only Furniture, Appliance but a Layout plan with step by step instructions. 


Each piece of furniture is defined by its name, category and price. More specific categories of furniture such as PatioFurniture and BathroomFurniture can also be created with similar arguments.

case class Furniture(name: String, category: String, price: Double) 
 
case class PatioFurniture(
  name: String, 
  price: Double, 
  season: String
) extends Furniture(name, "Patio", price)
 
case class BathroomFurniture(
  name: String, 
  price: Double, 
  floor: Int=1
) extends Furniture(name, "Bathroom", price)

A house contains also appliances and require a layout plan to be setup. An appliance has a name, price and a warranty if needed and available. The class Layout is instantiated with a name and an option.

case class Appliance(
  name: String, 
  warranty: Boolean, 
  price: Double)

case class Layout(name: String, option: Int)

The goal is to furnish a house with a combination of appliances, pieces of furniture, following a layout plan. The implementation computes the total cost, once a combination of furniture, appliance and layout is selected.

To this purpose, we create several modules, implemented as a trait of type FurnitureModule (line 1) to encapsulate each category of furniture. In our case the FurnitureModule define patio furniture, PatioFurniture (lines 6, 10).The bathroom furniture, instance of BathroomFurniture (lines 15- 19) is wrapped into its dedicated module BathroomFurnitureModule (line 14).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
trait FurnitureModule {
    // abstract attribute
  val furnitures: List[Furniture]
   
  class Furniture(id: String, category: String, price: Double) 
  class PatioFurniture(
    id: String, 
    price: Double, 
    season: String
  ) extends Furniture(id, "Patio", price)
}
 
 
trait BathroomFurnitureModule extends FurnitureModule {
  class BathroomFurniture(
    id: String, 
    price: Double, 
    floor: Int=1
  )  extends Furniture(id, "Bathroom", price)
}

Let's dig into the code ...
The first trait, FurnitureModule defines a generic Furniture (line 5) and a specialized furniture type, PatioFurniture. Dynamic binding is managed by the module that encapsulates the hierarchy of furniture types. Alternatively, the client code can manage dynamic binding or dependency injection by creating a sub-module, BathroomFurnitureModule (line 14), to manage other type of furniture, BathroomFurniture. The bathroom furniture, instance of BathroomFurniture (lines 15- 19) is wrapped into its dedicated module BathroomFurnitureModule (line 14). It should be clear, by now that these two different approaches
  • Customizing modules
  • Customizing classes or types within each module
can be applied either separately or all together.

The same strategy is applied to the Appliance and Layout.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
trait ApplianceModule {
  val appliances: List[Appliance]
   
  class Appliance(name: String, warranty: Boolean, price: Double) {
     
    def this(name: String, price: Double) = this(name, true ,price)
    def cost: Double = if( warranty ) price * 1.15 else price
  }
}
 
 
trait LayoutModule {
  val layout: Layout
     
  class Layout(name: String, option: Int) {
    val movingInstructions: List[String] = List.empty
  }
}

A single class Appliance (line 4) with two constructors (lines 4 & 6) is enough to describe all types of appliances in the ApplianceModule space or scope. The same concept applies to the LayoutModule component (lines 12, 17).

The factory class, RoomFurnishing,(line 1) relies on a self referential condition, using one of the components, LayoutModule and other components as mixin, ApplianceModule and FurnitureModule (line 2).. The factory class defines all the methods that is required to manage any combination of the components to enable us to compute the total cost (line 4).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class RoomFurnishing {
self: LayoutModule with FurnitureModule with ApplianceModule => 

  def cost: String = {
    val totalCost = furnitures.map(_.cost).sum + appliances.map(_.cost).sum
    s"RoomFurnishing: ${totalCost }"
  }
    
  def movingDate: String = "October, 2013"
}

Here is an example how the client code can dynamically assemble all the components and compute the total cost.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
val houseFurnishing =
 if( country ) 
   new RoomFurnishing 
     with FurnitureModule 
       with ApplianceModule 
        with LayoutModule {
 
    val layout = new Layout("Country Home", 2)

    val furnitures = List[Furniture](
      new Furniture("Modern Chair", "Chair", 136.0), 
      new PatioFurniture("Bench", 350.0, "Summer")
    )
      
    val appliances = List[Appliance](
      new Appliance("Microwave Oven", false, 185.0), 
      new Appliance("Dishwaher", true, 560.0)
    )
  }
    
  else 
    new RoomFurnishing 
      with BathroomFurnitureModule 
        with ApplianceModule 
          with LayoutModule {
 
      val layout = new Layout("Apartment", 4)
      val furnitures = List[BathroomFurniture](
        new BathroomFurniture("Stool", 89.5), 
        new BathroomFurniture("Cabinet", 255.6, 2)
      )
       
      val appliances = List[Appliance](
        new Appliance("Microwave Oven", false, 185.0), 
        new Appliance("Dishwaher", true, 560.0)
      )
  }
Console.println(houseFurnishing.cost)

The dynamic composition of Furniture, appliance given a layout to furnish a room, instance of RoomFurnishing is made clear in the code snippet. If the goal is to furnish a country house (line 2), then the interior decorator selects Modern chair (line 11), bench (line 12), Microwave Oven (line 16) and Dishwasher (line 17)following a Country Home layout (line 8). The same principle applies to the selection of appliances and furniture to furnish an appartment (lines 22- 36)

This technique combines class composition, inheritance, self-reference and abstract variables to provide a simple and flexible framework. The post also introduces the concept of layout or assembler to hide some complexity and become the primary mixin. I may elaborate on this concept in a future post. I strongly recommend the article written by Jonas Bonér on Cake pattern (listed in the references) to get in depth understanding on both the motivation and variant of this pattern.


References

Monday, August 8, 2016

Spark ML pipelines I - Features encoding

Target audience: Intermediate
Estimated reading time: 10'


Apache spark introduced machine learning (ML) pipeline in version 1.4.0. A pipeline is actually a workflow or sequence of tasks that cleanse, filter, train, classify, predict and validate data set. Those tasks are defined as stage of the pipeline.
Spark 2.0 extends ML pipelines to support persistency, while the MLlib package is slowly deprecated in favor of the data frame and data set based ML library.
ML pipeline allows data scientist to weave transformers and estimators into a single monotonic classification and predictive models.

This first post related to ML pipeline implement a feature encoding pipeline


ML Pipelines 101
This section is a very brief overview of ML pipelines. Further information is available at
The key ingredient of a ML pipeline are

  • Transformers are algorithms which can transform one DataFrame into another DataFrame. Transformers are stateless
  • Estimators are algorithm which can be fit on a DataFrame to produce a Transformer (i.e. Estimator.fit)
  • Pipelines are estimators that weave or chain multiple Transformers and Estimators together to specify an ML workflow
  • Pipeline stages are the basic element of the ML workflow. Transformers, estimators and pipelines are pipeline stages
  • Parameters encapsulate the tuning and configuration parameters requires for each Transformers and Estimators.
Note: The following implementation has been tested using Apache Spark 2.0


Features encoding pipeline
Categorical features have multiple values or instances which are represented as a string. Numerical value as categorized or bucketed through a conversion to a string.
Once converted to a string, the categorical values are converted into category indices using the StringIndex transformer. The resulting sequence of indices is ranked by decreasing order of frequency of the value in the training or validation set.
Next, the indices associated to particular features are encoded into a vector of binary value (0, 1) through the OneHotEncoder transformer. A feature instance is encoded as 1 if is defined in the data point, 0 otherwise.
Finally, the vector of binary values of all the feature are aggregated or assembled through the transformer VectorAssembler
Let's consider the following simple use case: A sales report list the date an item is sold, its id, the sales region and name of the sales person or agent.
   date         id         region    agent
   ---------------------------------------
   07/10/2014   23c9a89d   17        aa4
The encoding pipeline is illustrated in the following diagram:


The data source is a CSV sales report file. Each column or feature is converted to a string index, then encoded as a vector of binary values. Finally, the 4 vectors are assembled into a single features vector.


Implementation
The first step is to create a Spark 2.x session. Let's wrap the spark session configuration into a single, reusable trait, SparkSessionManager

trait SparkSessionManager {
  protected[this] val sparkSession = SparkSession.builder()
    .appName("LR-Pipeline")
    .config(new SparkConf()
    .set("spark.default.parallelism", "4")
    .set("spark.rdd.compress", "true")
    .set("spark.executor.memory", "8g")
    .set("spark.shuffle.spill", "true")
    .set("spark.shuffle.spill.compress", "true")
    .set("spark.io.compression.codec", "lzf")
    .setMaster("local[4]")).getOrCreate()

  protected def csv2DF(dataFile: String): DataFrame =
    sparkSession.read.option("header", true)
                    .option("inferSchema", true)
                    .csv(dataFile)

  def stop: Unit = sparkSession.stop
}

The SparkSession is instantiated with the same configuration SparkConf as the SparkContext used in Spark 0.x and 1.x versions.
The method csv2DF loads the content of CSV file and generate a data frame or data set.

The next step consists of creating the encoding workflow or pipeline, wrapped into the DataEncoding

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
trait DataEncoding {
  protected[this] val colNames: Array[String]
  lazy val vecColNames = colNames.map(vector(_))
  
  lazy val pipelineStages: Array[PipelineStage] =
    colNames.map(colName => new StringIndexer()
       .setInputCol(colName)
       .setOutputCol(index(colName))) ++
    colNames.map(colName => new OneHotEncoder()
       .setInputCol(index(colName))
       .setOutputCol(vector(colName))) ++
    Array[PipelineStage](new VectorAssembler()
       .setInputCols(vecColNames).setOutputCol("features"))

  def index(colName: String): String = s"${colName}Index"
  def vector(colName: String): String = s"${colName}Vector"
}

The sequence of names of the columns (features) colNames is an abstract value that needs to be defined for the specific training set (line 2). The first two stages of the data encoding pipeline, String indexing (line 6-8) and encoding (line 9-11) are applied to each of the 4 features. The last stage, assembling the features vector (line 12, 13) is added to the array of the previous stages to complete the pipeline. Each stage is defined by its input column setInputCol and output column/feature setOutputCol

Let's leverage the Spark session and the data encoding to generate a classification model, ModelFactory. Classification and predictive models are built using ML pipelines as described in a future post. For now let's create a pipeline model using the data encoding stages

val dataEncoder = new DataEncoding {
  override protected[this] val colNames: Array[String]= 
    Array[String]("date", "id", "region", "agent")
   
  def pipelineModel(df: DataFrame): PipelineModel = 
     new Pipeline().setStages(pipelineStages).fit(df)
  }
  

The pipeline model is generated in two steps:
  • Instantiate the pipeline from the date encoding stages
  • Generate the model from a input or training data frame, df by invoking the fit method.
The next post extends the data encoding pipeline with two estimators: a classifier and a cross validator.


Monday, July 4, 2016

Monte Carlo Integration in Scala

Target audience: Intermediate
Estimated reading time: 10'


This post introduces an overlooked numerical integration method leveraging the ubiquituous Monte Carlo simulation.
Not every function has a closed form for computing a definite integral known as symbolic integration. There are many numerical integration method for continuous functions such as Simpson's formula, Newton-Cotes quadrature rule and Gaussian quadrature. These methods are deterministic by nature.
The Monte-Carlo numerical integration is a stochastic method that relies on randomly generated values to estimate the area under a curve, surface or any multidimensional space. The illustration of the Monte Carlo integration method uses a pragmatic uniform random distribution of data points for single variable function.


Basic concept
Let's consider the single variable function illustrated in the following line plot.
\[f(x)=\frac{1}{x}-0.5\]

The objective is to compute the integral of the function over the interval [1, 3]. The Monte Carlo numerical integration is defined by three steps:
  • Define the outer area of the function defined by the minimum and maximum values of x and y, in this case x [1, 3] and y [0.5, -0.12]
  • Generate random data points between over the outer area
  • Compute the ratio of the number of random data points within the function area over the total number of data points
Let's define a generic class, MonteCarloIntegrator to encapsulate these 3 steps:
The class has two arguments (line 1)
  • The function f to sum
  • The number of random data points used in the methodology
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MonteCarloIntegrator(f: Double => Double, numPoints: Int) {

  def integrate(from: Double, to: Double): Double = {
    val (min, max) = getBounds(from, to)
    val width = to - from
    val height = if (min >= 0.0) max else max - min
    val outerArea = width * height
    val randomx = new Random(System.currentTimeMillis)
    val randomy = new Random(System.currentTimeMillis + 42L)

    def randomSquare(randomx: Random, randomy: Random): Double = {
      val numInsideArea = Range(0, numPoints)./:(0)(
        (s, n) => {
        val ptx = randomx.nextDouble * width + from
        val pty = randomy.nextDouble * height
        randomx.setSeed(randomy.nextLong)
        randomy.setSeed(randomx.nextLong)

        s + (if (pty > 0.0 && pty < f(ptx)) 1 
            else if (pty < 0.0 && pty > f(ptx)) -1 
            else 0)
      }
     )
      numInsideArea.toDouble * outerArea / numPoints
    }
    randomSquare(randomx, randomy)
  }
}

The method integrate implements the sum of the function over the interval [from, to] (line 3). The first step is to extract the bounds getBounds of the outer area (line 4) which size is computed on line 7. Each coordinate is assigned a random generator randomx and randomy (lines 8 & 9).
The nested method randomSquare records the number of data points, numInsideArea that falls into the area delimited by the function (line 13 - 21). The sum is computed as the relative number of data points inside the area delimited by the function (line 24).

The method getBounds is described in the following code snippet. It is a simple, although not particularly efficient approach to extract the boundary of the integration. It breaks down the interval into of steps (lines 2 & 3) and collects the minimum and maximum values of the function (lines 7 - 12).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def getBounds(from: Double, to: Double): (Double, Double) = {
  val numSteps = Math.sqrt(numPoints).floor.toInt
  val stepSize = (to - from) / numSteps

  (0 to numSteps)./:((Double.MaxValue, -Double.MaxValue))(
    (minMax, n) => {
      val y = f(n * stepSize + from)
      updateBounds(y, minMax) match {
        case 0x01 => (y, minMax._2)
        case 0x02 => (minMax._1, y)
        case 0x03 => (y, y)
        case _ => minMax
      }
    }
  )
}


def updateBounds(y: Double, minMax: (Double,Double)): Int = {
  var flag = 0x00
  if (y < minMax._1) flag += 0x01
  if (y > minMax._2) flag += 0x02
  flag
}


Precision
You may wonder about the accuracy of the Monte Carlo method and how many randomly generated data points are needed for a decent accuracy. Let's consider the same function \[f(x)=\frac{1}{x}-0.5\] and its indefinite integral, that is used to generated the expected sum for the function f \[\int f(x)dx=log(x)-\frac{1}{2x}+C\] The simple test consists of computing the error between the value produced by the definite integral and the sum from the Monte Carlo method as implemented in the following code snippet.

1
2
3
4
5
6
7
8
val fct =(x: Double) => 2 * x - 1.0
val integral = (x: Double, c: Double) => x*x - x + c)

final val numPoints=10000
val integrator = new MonteCarloIntegrator(fct, numPoints)

val predicted = monteCarloIntegrator.integrate(1.0, 2.0)
val expected = integral(2.0, 0.0) - integral(1.0, 0.0)

We run the test 100 times for number of random data points varying between 100 and 50,000.

Monte Carlo is reasonably accurate even with a small number of data points. In this case, the small increase in accuracy does not justify the need to randomize a significant number of data points beyond 1000 data points.

A smarter approach would be to rely on a exit condition to complete the summation in the shortest time possible without having to estimate the optimum number of random data points. A new batch of numPoints rand data points is added at each iteration. One simple convergence criteria compares the difference of the sum between two consecutive iterations:
  sum(existing data points + new batch data points) - sum(existing data points) < eps
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def randomSquare(randomx: Random, randomy: Random): Double = {
  var oldValue = 0.0
  Range(0, numPoints).takeWhile(_ => {
    val numInsideArea = ....
      // ...
    s + ....
    })
    val newValue = numInsideArea.toDouble * outerArea / numPoints
    val diff = Math.abs(newValue - oldValue) 
    oldValue = newValue
    diff < eps
  })
  oldValue
}

At each iteration, a new batch of numPoints data points is randomly generated to enhance the accuracy of the summation. The exit strategy is implemented through the higher order Scala collection method takeWhile (lines 3 & 11).

Note: This implementation of the Monte Carlo integration is simple enough to illustrate the concept of stochastic methods applied to calculus. The recursive stratified sampling has been shown to be more accurate for computing definite integral of function with significant inflection points (extreme second order derivative).


References