Thursday, April 28, 2016

Normalized Discounted Cumulative Gain in Scala

Overview
Numerous real-life applications of machine learning require the prediction the most relevant ranking of items to optimize an outcome. For instance
  • Evaluate and prioritize counter-measures to cyber-attach
  • Ranks symptoms in a clinical trial
  • Extract documents relevant to a given topic from a corpus
The Discounted Cumulative Gain (DCG) and its normalized counter part, Normalized Discounted Cumulative Gain (NDCG) is a metric original applied in textual information retrieval and extended to other domains.

Discounted Cumulative Gain
Let's dive into the mathematical formalism for the Discounted Cumulative Gain. 

For a indexed target values tj as illustrated in the diagram above, the discounted cumulative gain is computed as
\[DCG=\sum_{j=1}^{n}\frac{2^{t_{j}}-1}{log_{2}(j+1)}\] The objective is to compare any given list of ranked/sorted item with a benchmark which represent the optimum ranking (ranking with the highest DCG value).
\[p(ranking|IDCG)=\frac{log(2)}{IDCG}\sum_{j=0}^{n}\frac{2^{t_{j}}-1}{log(j+1)}\]

Scala implementation
The implementation of the computation of NDCG in Scala is quite simple, indeed. Given a ranked list of items. The three steps are
  • Compute the IDCG (or normalization factor)from the list
  • Compute the DCG for the list
  • Compute the NDCG = DCG/IDCF
First let's consider list of items, of type T to rank. The method ranking to sort a sample of sorted items is provided as an implicit function. The constructor for NDCG has a single argument: the sample of ranking:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class NDCG[T](
   firstSample: Seq[T])
   (implicit ranking: T => Int) {
  import NDCG._

  val iDCG: Double = normalize

  def score: Double = score(initialSample)

  private def dcg(samples: Seq[T]): Double =
    samples.zipWithIndex.aggregate(0.0)(
      (s, samplej) => s + compute(samplej._2 + 1, ranking(samplej._1))
      , _ + _)


  private def normalize: Double = {
    val sorted = initialSample.zipWithIndex.sortBy{
      case (sample, n) => -ranking(sample)
    }.map( _._1)
    dcg(sorted)
  }
}

The Ideal Discounted Cumulative Gain, iDCG is compute through the normalize method (line 6). iDCG (normalization factor) is computed by first sorting the items of type T by their value in decreasing order (line 16), then scoring this re-sorted list using the dcg method (line 17).
The computation of the Discounted Cumulative Gain by the method dcg (line 10) is a direct application of the formula described in the previous chapter.
Note: The logarithm function uses a base 2. It is computed as natural log(x)/natural log (2)
Let's now consider a list of items of type Item defined as follows:
case class Item(id: String, x: Double, rank: Int)

The list of items, itemsList is implicitly ranked through the last attribute, rank.

val itemsList = Seq[Item](
  Item("1", 3.4, 4), Item("2", 1.4, 3),
  Item("3", -0.7, 5), Item("4", 5.2, 2), 
  Item("5", 1.4, 1))

implicit val ranking = (item: Item) => item.rank

It is time to compute the NDCG coefficient for the list of items, by invoking the score method.

val nDCG = new NDCG[Item](itemsList)

println(s"IDCG = ${nDCG.iDCG}")    //45.64
println(s"Score = ${nDCG.score}")  // 0.801

The ideal discounted cumulative gain, iDCG is 45.6: It is the optimum ranking for this list of time. The first sample score a probability of 0.8
Note
The DCG of subsequent samples can be computed using the same iDCG value from the same instance of NDCG.

def score(samples: Seq[T]): Double =
  if( samples.size != initialSample.size) 0.0 
  else dcg(samples)/iDCG

References

Friday, April 15, 2016

Managing Spark context in ScalaTest

Overview
Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
  • Specify your Spark context configuration SparkConf
  • Create the Spark context
  • Add test code related to your application
  • Clean up resources (Spark context, Akka context, File handles...
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context lifecyle of your test application.

Wrapping the Spark context
The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
trait SparkContextWrapper {
  protected[this] var sc: SparkContext = _

  def getContext: SparkContext = {
    val conf = new SparkConf().setAppName(s"Test-App")
      .set("spark.executor.instances", "2")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "2g")
      .set("spark.executor.cores", "2")
      .set("spark.serializer", 
              "org.apache.spark.serializer.KryoSerializer")
      .set("spark.rdd.compress", "true")
      .set("spark.shuffle.spill", "true")
      .set("spark.shuffle.spill.compress", "true")
      .set("spark.shuffle.memoryFraction", "0.4")
      .set("spark.io.compression.codec", "snappy")
      .set("spark.network.timeout", "600")

    sc = SparkContext.getOrCreate(conf.setMaster("local[4]"))
    sc.setLogLevel("ERROR")
    sc
  }

  def close: Unit = {
    if (sc != null) sc.stop
  }
}

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
trait SparkContextForTesting 
     extends SparkContextWrapper with BeforeAndAfterAll { 
  self: Suite =>

  override def beforeAll: Unit = {
    getContext
    super.beforeAll
  }

  override def afterAll: Unit = {
    close
    super.afterAll
  }
}

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class MyTest 
  extends FlatSpec 
    with Matchers 
      with SparkContextForTesting {

 val x: Int = -4
 it should "My first test" in {
   val sqlContext = new SQLContext(sc) 
   // ....
 }
   // ... other tests

 it should "My last test" in {
   // clean up after it completes
 }
}

As they say at the end of the movie, "That's all folks!"

Reference
ScalaTest User Guide

Monday, March 7, 2016

Weighting logistic loss for imbalanced dataset in Spark

Overview
Some applications such as spam or online targeting have an imbalanced dataset. The number of observations associated to one label is very small (minority class) compared to the number of observations associated to the other labels.

Let's consider the case of intrusion detection system that identifies an cyber attack through a Virtual Private Network (VPN) in an entreprise. The objective is to classify every session based on one or several TCP connections as potentially harmful intrusion or normal transaction knowing that a very small fraction of attempt to connect and authenticate are actual deliberate attacks (less than 1 for each million of connections).
There are few points to consider:
  • The number of reported attack is very likely very small (< 10). Therefore, the very small set of labeled security breach is plagued with high variance
  • A very large and expensive labeling operation is required, if even available, to generate a fairly stable negative class (security breach)
  • The predictor can be extremely accurate (as measured by F1-score, Area under the ROC curve or PR curve) by always classifying any new attempt to connect to the VPN as harmless. In the case of 1 reported attack per 1 million VPN session, the prediction would be accurate 99.999% of the time.

There are several approaches to address the problem of imbalanced training and validation sets. These list of the most common known techniques onclude
  • Sub-sampling the majority class (i.e. harmless VPN sessions): It reduces the number of labels for the normal sessions while preserving the labeled reported attacks.
  • Over-sampling the minority class: This technique generates synthetic sampling using bootstrapped samples based on k Nearest Neighbors algorithm
  • Application of weights differential to the logistic loss used in the training of the model
This post focuses on the third option or re-balancing (weighting) of the logistic loss. The implementation of this technique is specific to Apache Spark and more specifically the ML implementation of the Logistic Regression (not MLlib)that leverages data frames.
It is assumed the reader is somewhat familiar the Apache Spark, data frame and its machine learning module MLlib.

Weighting the logistic loss
Let's consider the logistic loss commonly used in training a binary model f with feature x and label y: \[logLoss = - \sum_{i=0}^{n-1}[y_i .log(f(x_i)) + (1 - y_i) .log(1 - f(x_i))]\] The first component of the loss function is related to the minority observations (security breach through a VPN: label = 1) while the a second component represents the loss related to the majority observation (harmless VPN sessions: label = 0)
Let's consider the ratio, r of number of observations related to the majority label over the total number of observations: \[r = \frac{ i: (y_i = 1)}{i : y_i}\] The logistic loss can be then rebalanced as \[logloss = -\sum_{i=0}^{n-1} [r.y_i.log(f(x_i)) + (1-r).(1-y_i).log(f(x_i))]\] The next step is to implement the weighting of the binomial logistic regression classifier using Apache Spark.

Weighting the logistic loss
Apache Spark is a open source framework for in-memory processing of large datasets (think as Hadoop on steroids). Apache Spark framework contains a machine learning module known as MLlib. The objective is to modify/override the train method of the LogisticRegression.
One simple option is to sub-class LogisticRegression class in the mllib/ml package. However the logistic loss is actually computed in the private class LogisticAggregatorb which cannot be overridden.
However if you browse through the ml.classification.LogisticRegression.train Scala code, you notice that the class Instance that encapsulates labeled data for the computation of the loss and the gradient of loss has three parameters
  • label: Double
  • feature: linalg.Vector
  • weight: Double

The plan is to use this 3rd parameter weight as the balancing weight ratio r. This is simply accomplished by adding an extra column weightCol to the input data frame dataset and define its value as
  • balancingRatio r for label = 1
  • 1 - balancingRatio (1 - r) for label = 0
A simple implementation of the Weighted Logistic Regression using Apache Spark MLlib implementation of the Logistic Regression :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final val BalancingWeightColumnName: String = "weightCol"
final val BalanceRatioBounds = (0.001, 0.999)
final val ImportanceWeightNormalization = 2.0

class WeightedLogisticRegression(balanceRatio: Double = 0.5) 
   extends LogisticRegression(UUID.randomUUID.toString) 
      with Logging {

  this.setWeightCol(BalancingWeightColumnName)

  private[this] val balancingRatio = 
    if (balanceRatio < BalanceRatioBounds._1) 
       BalanceRatioBounds._1
    else if (balanceRatio > BalanceRatioBounds._2) 
       BalanceRatioBounds._2
    else
       balanceRatio

  override protected def train(dataset: DataFrame): 
      LogisticRegressionModel = {
    val newDataset = dataset.withColumn("weightCol", lit(0.0))

    val weightedDataset: RDD[Row] = dataset.map(row => {
      val w = if (row(0) == 1.0) balancingRatio 
         else 1.0 - balancingRatio
      Row(row.getAs[Double](0), row.getAs[Vector](1), 
           ImportanceWeightNormalization * w)
    })

    val weightedDataFrame = dataset.sqlContext
        .createDataFrame(weightedDataset, newDataset.schema)
    super.train(weightedDataFrame)
  }
}

Notes
  • The balancing ratio has to be normalized by a factor ImportanceWeightNormalization = 2: The factor is require to produce a weight of 1 for both the majority and minority classes from a fully balanced ratio of 0.5.
  • The actual balancing ratio needs to be constrained within an acceptable range BalanceRatioBounds to prevent for the minority class to have an outsize influence of the weights of the logistic regression model. In the extreme case, there may not even be a single observation in the minority class (security breach through the VPN). These minimum and maximum values are highly dependent on the type of application.

Here is an example of application of the WeightedLogisticRegression on a training data frame labeledData. The number of data points (or observations) associated to label = 1 is extracted through a simple filter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val numPositives = trainingDF.filter("label == 1.0").count
val datasetSize = labeledData.count
val balanceRatio = (datasetSize- numPositives).toDouble/datasetSize

val lr = new WeightedLogisticRegression(balanceRatio)
          .setMaxIter(20)
          .setElasticNetParam(0)
          .setFitIntercept(true)
          .setTol(1E-6)
          .setRegParam(1E-2)

val model = lr.fit(trainingDF)

One simple validation of this implementation of the weighted logistic regression on Apache Spark is to verify that the weights of the logistic regression model generated with the WeightedLogisticRegression with a balancing ratio of 0.5 are very similar to the weights generated by the logistic regression model of the Apache Spark MLlib (ml.classification.LogisticRegressionModel)

Note: This implementation relies on Apache Spark 1.6.0. The latest implementation of the Logistic Regression in beta release of Apache Spark 2.0 does not allow to override the LogisticRegression outside the scope of Apache Spark MLlib.

References

Thursday, February 4, 2016

Newton-Raphson revisited

Overview
The Newton-Raphson is a well-know technique to compute the root of a function, f(x) = 0 or the minimum/maximum of a function using its derivative f'(x) = 0.
The simplicity of the Newton-Raphson method, in term of concept as well as implementation makes it a very popular solution. We will look into two different implementation in Scala and conclude the post by evaluation the relative benefits and limitations of the Newton-Raphson.

First implementation
Let's dive into the computation of the root x of a function f. A function is defined by its Taylor series of derivatives as follow:
\[f(s)=\sum_{0}^{\infty }\frac{f^{(n)}(x)}{n!}(s-x)^{n}\] In case of the root f(s), the equation becomes
\[0=f(x)+f'(x)(x-s)+O((x-s)^{2}f"(s))\] resulting to
\[x_{n+1}=x_{n}- \frac{f(x_{n})}{f'(x_{n})}\]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
final class NewtonRaphson(
  f:   Double => Double,
  df:  Double => Double,
  dff: Option[Double => Double] = None
) {
  final val EPS = 0.01

  @scala.annotation.tailrec
  def root(z: Double): Double = dff.map(_dff => {
    val nextX = x - f(x) / df(x)
    if (Math.abs(nextX - x) < EPS) nextX 
    else root(nextX)
  }
}

The construtor (lines 2 to 4) takes 3 arguments:
  • Function which root is to be computed
  • First order derivative
  • Optional second order derivative that is not used in this first implementation
The formula is implemented by the method root (lines 9 to 12).Let's apply the root method to a concrete example, with function f and its derivative ff:

1
2
3
4
5
val f = (x: Double) => Math.pow(x, 5.0) - 2.0*x
val ff = (x: Double) => 5.0*Math.pow(x,4.0) -2.0
 
val nr1 = new NewtonRaphson(f, ff)
val z1 = nr1.root(7.9)

So far so good. However it is assumed that the function has a single root, or in the case of a maximum/minimum, its derivative has single root, at least in the vicinity of the initial data point. What about computing the root of a function which may have no or several root?
Let's look at the following function g and its derivate gg

1
2
3
4
val g = (x: Double) => Math.exp(0.5 * x)
val gg = (x: Double) => 0.5 * g(x)

val nr3 = new NewtonRaphson(g, gg)

The call to the Newton-Raphson method nr5 will diverge. In this particular case, the computation generates a Double.NaN, at each itertion.
There are several options to guarantee that the method will properly exit in the case no root exists. Let's look at one solution that leverages on the second derivative.

Second derivative to the rescue
We need to go back to basics: in its simplest form, the Newton-Raphson does not take into account the derivative of order higher than 1. The second order derivative, f" can be used as rough approximation of the error on the approximation of the root. The error san be evaluated at each iteration against a convergence criteria EPS.
\[\Delta \varepsilon = \varepsilon _{n+1}- \varepsilon _{n} \sim \frac{f'(x_{n})}{f"(x_{n})} < EPS\] \[x_{n+1}=x_{n}-\frac{f(x_{n})}{f'(x_{n})}(1 + \Delta \varepsilon)\] The approximation of the error is also used to validate whether the algorithm converges toward a solution (root) or starts to diverge. Let's implement this variation of the Newton-Raphson using the second order derivative as a convergence criteria.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@scala.annotation.tailrec
def root(z: Double): Double = dff.map(_dff => {

  @scala.annotation.tailrec
  def nr2ndOrder(xe: (Double, Double)): (Double, Double) = {
    val x = xe._1
    val eps = xe._2
      
    val nextEps = Math.abs(df(x) / _dff(x))
    val nextX = (x - f(x) *(1.0 + nextEps)/df(x))

    // rules to exit recursion
    if (eps > 0.0 && nextEps >= eps)
      (-1.0, -1.0)
    else if (Math.abs(nextEps - eps) < EPS) 
      (nextX, nextEps)
     else 
       nr2ndOrder((nextX, nextEps))
  }
  
  nr2ndOrder((z, 0.0))._1
}).getOrElse(DIVERGE)

Once again, the computation of the root is implemented as an efficient tail recursion defined in the method nr2ndOrder (line 5)
The convergence criteria is implemented as follow:
  • if error, eps increases, exit the recursion => Diverge (lines 13 and 22)
  • if eps decreases with a value within the convergence criteria => Converged (line 15)
  • if eps decreases with a value higher than the convergence criteria => Recurse (line 18)


Summary
Let's compare the convergence of the simplest Newton-Raphson method with the convergence its variant using the 2nd order derivative, using the function f defined in the first paragraph.


Clearly, the adaptive step size using the second order derivative speeds up the convergence toward the root of the function f. The selection of the appropriate method comes down to the trade-off between the overhead of the computation of the second derivative and the larger number of iterations or recursions required to find the root within a predefined convergence criteria.

References
Programming in Scala 2nd Edition M. Odersky, L. Spoon, B. Venners - Artima - 2012
The Newton-Raphson method

Tuesday, January 5, 2016

Simple class versioning in Scala

Overview
Versioning is a common challenge in commercial software development. The most common technique to support multiple versions of a library, executable or framework, in Java or Scala relies on the class loader.
A library can be easily versioned by creating multiple jar files, one for each version.

Simple implementation
Let's select a simple case for each a library is self-contained in a jar file with two versions
  • ArrayIndex1.jar
  • ArrayIndex2.jar

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.net.{URL, URLClassLoader}
import java.io.File
import java.lang.reflect.Method

val cl1 = new URLClassLoader( 
  Array[URL](new File("ArrayIndex1.jar").toURL),   
  Thread.currentThread().getContextClassLoader()
)
val cl2 = new URLClassLoader( 
  Array[URL](new File("ArrayIndex2.jar").toURL),  
  Thread.currentThread().getContextClassLoader()
)
  
val compatibleClass: Class[_] = 
  if( version > 0.9) 
    cl1.loadClass("ArrayIndex") 
  else 
    cl2.loadClass("ArrayIndex")

val obj = compatibleClass.newInstance
val methods = compatibleClass.getMethods
println(s"${methods.map( _.getName).mkString(",")}")

The first step consists of loading the two versions of the library through their URL by converting each jar file names to a URL. This feat is accomplished by instantiating the class loaders: In this particular case, the class loader is undefined by its URL with the type URLClassLoader (line 5 & 9). The jar files are loaded within the current thread (line 7 & 11): A more efficient approach would consists of creating a future to load the classes asynchronously.
The class to be used in this case depends on the variable version (line 15). Once the appropriate class is loaded, its instance and method are ready available to the client be invoked.

Wednesday, December 2, 2015

Kullback-Leibler Divergence on Apache Spark

Overview
The Kullback-Leibler divergence is a concept borrowed from information theory and commonly associated with Information Gain. It is also known as the relative entropy between two distributions. It measures the dissimilarity of the distribution of random values (i.e. probability density function). Let's consider two distribution P and Q with probability density functions p and q
\[D_{KL}(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{p(x)}{q(x)}\]
The formula can be interpreted as the Information lost when the distribution Q is used to approximate the distribution P
It is obvious that the computation if not symmetric: \[D_{KL}(P||Q)\neq D_{KL}(Q||P)\]
The Kullback-Leibler divergence is the used to compute the Mutual Information which is one of the feature extraction method. It is part of the family of F-Divergences.

The objective of this post is to implement Kullback-Liebler divergence in Scala, for which the computation is distributed through the Apache Spark framework:
  • Implementation of the Kullback-Leibler divergence using Apache Spark
  • Illustration of the Kullback-Leibler divergence in comparing multiple continuous probability density functions

Implementation of the Kullback-Liebler Divergence
First let's implement the Kullback-Leibler formula for measuring the dissimilarity between an existing data sets {xi, yi} (presumably, generated by a specific random variable / distribution) and a given continuous distribution with a probability density function f

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
object KullbackLiebler {
  final val EPS = 1e-10
  type DATASET = Iterator[(Double, Double)]

  def execute( 
    xy: DATASET, 
    f: Double => Double): Double = {
    val z = xy.filter{ case(x, y) => abs(y) > EPS}
    - z./:(0.0){ case(s, (x, y)) => s + y*log(f(x)/y)}
  }

  def execute( 
    xy: DATASET, 
    fs: Iterable[Double=>Double]): Iterable[Double] = 
    fs.map(execute(xy, _))
}

The divergence methods execute are defined as static (object methods)
The first invocation of the method execute (line 5) computes the dissimilarity between a distribution that generates the dataset xy with the distribution with a probability density f.
The second execute method computes the KL divergence between a that generates the dataset xy with a sequence of distributions with probability density fs.

Next let's define some of the continuous random distributions used to evaluate the distribution represented by the dataset xy
Normal, beta, lognormal, Gamma, LogGamma and ChiSquare.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   // One Gaussian distribution
val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)

   // Uniform distribution
val uniform = (x: Double) => x

  // Log normal distribution
val logNormal = (x: Double) => { 
  val lx = log(x)
  INV_PI/x*exp(-lx*lx) 
}
  // Gamma distribution
val gamma = (x: Double, n: Int) => 
   exp(-x)*pow(x, n)/fact(n)

  // Log Gamma distribution
val logGamma = (x: Double, alpha: Int, beta: Int) =>
  exp(beta*x)*exp(-exp(x)/alpha)/
    (pow(alpha, beta)*fact(beta-1))

  // Simple computation of m! (for beta)
def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)
  // Normalization factor for Beta
val cBeta = (n: Int, m: Int) => {
  val f = if(n < 2) 1 else fact(n-1)
  val g = if(m < 2) 1 else fact(m-1)
  f*g/fact(n+m -1).toDouble
}

  // Beta distribution
val beta = (x: Double, alpha: Int, beta: Int) =>
  pow(x, alpha-1)*pow(x, beta-1)/cBeta(alpha, beta)

  // Chi-Square distribution
val chiSquare = (x: Double, k: Int) => {
  val k_2 = k >>1
  pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density function are implemented as Scala partially applied functions with predefined parameter values. For example...

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5)
val chiSquare4 = chiSquare(_: Double, 4)

It is important to mention that the implementation of the probability density functions in the code snippet are not optimizer for performance. There are described merely for illustrate the application of the Kullback-Leibler divergence. Please refer to your favorite Statistics handbook to learn more about these probability distributions.

Spark to the rescue
The executing the Kullback-Leibler formula for a large data set with a significant number of random distributions may be a daunting task. A solution is to use Apache Spark framework to parallelize the computation of the divergence. Here are the steps to follow:
  • Partition the original data set
  • Broadcast the first sequence of probability density functions
  • Execute the Kullback-Leibler formula on each partition using mapPartitions
  • Collect then aggregate the divergence factor from each partition

1
2
3
4
5
6
final val NUM_DATA_POINTS = 10000000
val numTasks: Int = 128
  
val conf = new SparkConf().setAppName("Kullback-Liebler").setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

Once we define the number of input data points (line 1) and the number of concurrent tasks (line 2), the Apache Spark context sc is configured with an instance of SparkConf (line 4).
Next let's implement the broadcasting mechanism for the batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 
  2 -> logNormal, 
  3 -> gamma1, 
  4 -> gamma2, 
  5 -> gamma4, 
  6 -> logGamma12, 
  7 -> logGamma22, 
  8 -> beta22, 
  9 -> beta25, 
  10 -> chiSquare2, 
  11 -> chiSquare4
)

val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))

The broadcast variable pdfs_broadcast alerts the Apache Spark framework that the probability density function pdfs are to be distributed over all the partition of worker (or data) nodes.
Value broadcasting is heavily used in the machine learning library MLlib to dispatch model coefficients (weights) to the data node for the computation of gradient and loss function.

Finally, let's implement the mapPartitions transformation. The method mapPartitions transform an array of values pair {x, y} into an array of size pdfs.size containing the KL divergence value related to each distribution to be evaluated.

1
2
3
4
5
6
val kl_rdd  = master_rdd.mapPartitions((it:DATASET) => {
  val pdfsList = pdfs_broadcast.value.map(
       n => pdfs.get(n).get
  )
  execute(it, pdfsList).iterator
} )

Finally, the divergence values from each partitions is collected then aggregated using the Scala fold operator :/.

1
2
3
4
5
6
7
8
val kl_master = kl_rdd.collect

val divergences = (0 until kl_master.size by pdfs.size)
  ./:(Array.fill(pdfs.size)(0.0))( (s, n) => {
     (0 until pdfs.size).foreach(j =>
         s.update(j, kl_master(n+j)))
         s
}).map( _ / kl_master.length)

Test results
The purpose of the test was to understand the potential of the Kullback-Leibler divergence to evaluate the dissimilarity between probability distribution.
We selected the Gaussian distribution as the benchmark for which other distributions are to be compared. The comparative results are displayed below:

As expected, the Gaussian and Normal distribution are very similar. However, the divergence between Gauss and the highest order of the Gamma and Beta distributions is quite significant. This can be easily confirmed by comparing the graph of their associated probability density functions.


References
Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006
Apache Spark documentation