Sunday, October 20, 2019

Contextual Thompson sampling 1: Theory

This post introduces the concept of multi-armed bandit (a.k.a. K-armed bandit) and the Thompson sampling for contextual bandit. The next post deals with the actual implementation of the contextual Thompson sampling in Apache Spark.

Multi-arm Bandit problem

The multi-armed bandit problem is a well-known reinforcement learning technique,  widely used for its simplicity. Let's consider a  slot machine with n arms (bandits) with each arm having its own biased probability distribution of success. In its simplest form pulling any one of the arms gives you a  reward of either 1 for success, or 0 for failure. Our objective is to pull the arms one-by-one in sequence such that we maximize our total reward collected in the long run.
Dual-arm bandit

Data scientists have developed several solutions to tackle the multi-armed bandit problem for which the 3 most common algorithms are
  • Epsilon-Greedy
  • Upper confidence bounds
  • Thompson sampling


The Epsilon-greedy is the simplest algorithm to address the  exploration-exploitation trade-off.  During exploitation, the lever with highest known payout is always pulled. However, from time to time (fraction ε with  ε < 0.1) the algorithm selects a random arm to 'explore' other arms which payout is relatively unknown. The arms with known or proven highest payout are pulled (1-ε of the time)

Upper Confidence Bound (UCB)

This solution is sometimes referred as Optimism in the Face of Uncertainty principle: It assumes that the unknown mean payoffs of each arm will be as high as possible, based on historical data.

Thompson Sampling

The Thompson sampling algorithm is fundamentally a Bayesian optimization technique which core principle known as probability matching strategy can be summed as ‘play an arm according to its probability of being the best arm’.(i.e. the number of pulls for a given lever should match its actual probability of being the optimal lever)

The following diagram illustrates the difference between the upper confidence bounds and the Thompson sampling.

Confidence Bounds for Multi-Armed bandit problem

Context anyone?

The techniques described above do not make any assumption on the environment or the agent who selects the arms. There are two scenario:

  • Context-free bandit: The selection of the arm with the highest rewards depends solely on the past history (prior) reward (success and failure). Such history can be modeled using a Bernoulli distribution. For instance, the probability to get a '6' playing dice does not depend on the player.
  • Contextual bandit: The state of the environment (a.k.a. context) is used as an input (or model) to the selection of the arm with the highest predicted reward. The algorithm observes a new context (state), choose one action (arm), and observes an outcome (reward). In ad targeting, the selection of a banner or insert to be displayed on a web site depends on the prior rewards history associated to the demographic data of the user.
Any of the techniques applied to the multi-armed bandit can be used with and without context. The following sections focus on the contextual multi-arm bandit problem.

Contextual Thompson sampling (CTS)

Let's dive into the key characteristics of the Thompson sampling
  • We assume the prior distribution on the parameters of the distribution (unknown) of the reward for each arm.
  • At each step, t, the arm is selected according to the posterior probability to be the optimal arm. 
The components of the contextual Thompson sampling are
1. Model of parameters w
2. A prior distribution p(w) of the model
3. History H consisting of a context x and reward r
4. Likelihood or probability p(r|x, w) of a reward given a context x and parameters w
5. Posterior distribution computed using naïve Bayes\[p(\widetilde{w}|H)=p(H|\widetilde{w}).p(\widetilde{w})/p(H))\]

But how can we model a context?

Actually, a process to select the most rewarding arm is actually a predictor or a learner. Each predictor takes the context, defines as a vector of features and predicts which arm will generate the highest reward.

The predictor is a model that can be defined as
- Linear model
- Generalized linear model (i.e. logistic)
- Neural network

The algorithm is implemented as a linear model (weights w) for estimating the reward from a context x  as \[w^{T}.x\]
The ultimate objective for any reinforcement learning model is to extract a policy which quantifies the behavior of the agent. Given a set X of context xi and a set A of arms, the policy is defined by the selection of an arm given a context x
\[\pi : X\rightarrow A\]

Contextual Thompson Sampling Algorithm

The sampling of the normal distribution (line 3) is described in details in the post Multivariate Normal Sampling with Scala. The algorithm computes the maximum reward estimation through sampling the normal distribution (line 4) and play the arm a* associated to it (line 5).
The parameters V and b are updated with the estimated value (line 7 and 8) and the actual reward is computed (line10) after the weights of the context are updated (line 9)

The next post describes the implementation of the contextual Thompson sampling using Scala, Nd4j and Apache Spark.


Solving the multi armed bandit problem
Introduction context free Thompson sampling
Thompson Sampling for Contextual Bandits with Linear Payoffs

Sunday, June 9, 2019

Multivariate Normal sampling with Scala and ND4j

This post describes the implementation of the multivariate normal sampling using ND4j. 

The multi-variate normal sampling function is used in various machine learning techniques such as Markov chains Monte Carlo (MCMC) or contextual multi-armed bandit.
The implementation of the multivariate normal relies on data vectorization, technique well-known to Python developers and data scientists alike.

Note: The post requires some knowledge of data vectorization (numpy, datavec, ND4j..) as well as Scala programming language.

Python Numpy is a well-known and reliable vectorized linear algebra library which is a foundation of scientific (SciPy) and machine learning (Sciktlearn) libraries. No serious data scientific projects can reasonably succeeds with the power and efficiency of numpy. 
The vectorization of datasets is the main reason behind the performance of machine learning models (training and prediction) build in Python.

Is there a similar linear algebra library, supporting vectorization, available to Scala and Spark developers? Yes, ND4j

ND4j library replicates the functionality of numpy for Java developers. ND4j can be downloaded as an independent library or as component of the deep learning library, deeplearning4j. It leverages the BLAS and LAPACK libraries.
From a Java developer perspective, the data represented by an NDArray is stored outside of the Java Virtual Machine. This design has the following benefits:
  • Avoid taxing the Garbage Collector
  • Interoperability with high-performance BLAS libraries such as OpenBlas
  • Allow number of array elements exceeds Int.MaxValue
The BLAS (Basic Linear Algebra Subprograms) are functions performing basic vector and matrix operations. The library is divided in 3 levels:
  • Level 1 BLAS perform scalar, vector and vector-vector operations,
  • Level 2 BLAS perform matrix-vector operations
  • Level 3 BLAS perform matrix-matrix operations. 
OpenBLAS is an optimized Basic Linear Algebra Subprograms (BLAS) library based on GotoBLAS2,  a C-library of linear algebra supporting a large variety of micro-processor. Its usage is governed by the BSD license.
LAPACK are Fortran routines for solving systems of simultaneous linear equations, least-squares solutions of linear systems of equations, eigenvalue problems, and singular value problems and matrix factorizations.

Implicit ND4j array conversion
The first step is to create a implicit conversation between ND4j and Scala data types.  
The following code convert an array of double into a INDArray using org.nd4j.linalg.factory.Nd4j java class and its constructor create(double[] values, int[] shape)
  • In case of a vector, the shape is defined in scala as  Array[Int](size_vector)
  • In case of a matrix, the shape is defined as Array[Int](numRows, numCols). 
The following snippet implement a very simple conversion from a Scala array to a INDArray

@throws(clazz = classOf[IllegalArgumentException])
implicit def double2NDArray(values: Array[Double]): INDArray = {
  require(values.nonEmpty, "ERROR: ND4, conversion ...")

  Nd4j.create(values, Array[Int](1, values.length))

Multivariate Normal distribution implementation
The sampling of a multivariate normal distribution is defined by the formula 
\[\mathbb{N}(\mu, \Sigma )=\frac{1}{\sqrt{(2\pi)^n \left | \Sigma \right |}} e^{\frac{1}{2}(x-\mu)^T {\Sigma '}^{-1}(x-\mu)}\] A generalized definition adds a very small random perturbation factor r |r| <= 1 on the variance value (diagonal of the covariance matrix) \[\Sigma ' = \Sigma + r.I\] Sigma is the covariance matrix and the mu is the mean value. 
The computation of the multivariate normal sampling can be approximated by the Cholesky decomposition. In a nutshell, the Cholesky algorithm decompose a positive-definite matrix into a product of two matrix
  • lower triangle matrix
  • transposition of its conjugate
It serves the same purpose as the ubiquitous LU decomposition with less computation. \[\mathbb{N}(\mu, \Sigma) \sim \mu + Chol(\Sigma).Z^T\] where \[L=Chol(\Sigma)\] and \[L.L^T=\Sigma\]. The vector Z is a multivariate normal noise \[Z= \{ z_i | z_i=N(0, 1)\}\]
The following implementation relies on the direct invocation of LAPACK library function potrf. The LAPACK functionality are accessed through the BLAS wrapper getBlasWrapper.

 final def choleskyDecomposition(matrix: INDArray): INDArray = {
   val matrixDup = matrix.dup
   Nd4j.getBlasWrapper.lapack.potrf(matrixDup, false)

Note that the matrix is duplicated prior to the LAPACK function call as we do not want to alter the input matrix. 
Let's implement the multivariate Normal sampler with perturbation using the Cholesky decomposition.

@throws(clazz = classOf[IllegalArgumentException])
@throws(clazz = classOf[IllegalStateException])
final def multiVariateNormalSampling(
   mean: INDArray,
   cov: INDArray,
   perturbation: Double = 0.0): INDArray = {
 import scala.util.Random
 require(cov.size(0) == mean.length, s"Sigma shape ${cov.size(0)} should be mean size ${mean.length}")
 require(cov.size(1) == mean.length, s"Sigma shape ${cov.size(1)} should be ${mean.length}")

 try {
  // Add a perturbation epsilon value to the covariance matrix
  // if it is defined
  val perturbMatrix =
   if(perturbation > 0.0)
    cov.add( squareIdentityMatrix(cov.size(0), 

  // Apply the Cholesky decomposition
  val perturbed: INDArray = choleskyDecomposition(perturbMatrix)
   // Instantiate a normal distribution
  val normalPdf = new NormalDistribution(
       new DefaultRandom, 

  val normalDensity = Array.fill(mean.size(0))(normalPdf.sample)
  val normalNDArray: INDArray = normalDensity

   // This is an implementation of the Dot product
  val normalCov = perturbed.mmul(normalNDArray.transpose)
   // Add the normalized perturbed covariance to the mean value
 catch {
  case e: org.nd4j.linalg.api.blas.BlasException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: org.apache.commons.math3.exception.NotStrictlyPositiveException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: Throwable =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")

Let's look at the full implementation of the multi-variate normal sampling.
The first step validates the shape of the mean and covariance input matrices [line 8, 9]. As mentioned earlier, the generalized normal distribution introduces an optional random perturbation of small magnitude (~1e-4) [line 14-17] that is useful for application that requires some stochastic
The 'perturbed' covariance matrix is factorized using the Cholesky decomposition [line 22]. The normal probability density function (mean 0.0 and standard deviation 1.0) is used to generate random values [line 24-30] which is then applied to the covariance matrix [line 33].
The normal randomized variance is added to the vector of mean values [line 35].

For the sake of convenience, the multivariate normal density function uses the Apache Math common 3.5 API [line 24].

Scala 2.13.2
JDK 1.8
ND4j 1.0.0-beta3

Scala 2.13
Deep learning 4j
Cholesky decomposition
Approximation multivariate Normal sampling
Apache common math

Wednesday, November 21, 2018

Integration of 3rd party service with Spark

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external or 3rd party remote services such as natural language processing, images classification, reporting or data cleansing. 
These remote services or micro-services are commonly accessed through a REST API and are deployable as clusters of nodes (or instances) to improve scalability and high availability. 
Load balancing solutions are known to address scalability challenges since the dawn of the internet.

Is there an alternative to load balancers for scaling remote web services?

This post describes the integration of 3rd party micro services deployed on AWS with the Apache Spark framework. We compare two approaches to integrate Spark workers to the 3rd party service nodes: 

  • Deploy a load balancer between Spark executors and remote service
  • Apply hash partitioning for which the IP of each  service instance is assigned to a given Spark partition.
 Note: This post assumes the reader is somewhat familiar with load balancers and a rudimentary knowledge of the Apache Spark framework.

Load balancers
Load balancers are commonly used to route requests to web or micro-services according to a predefined policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of internet and web sites.
A load balancer is a very simple and easy solution to deploy micro services at scale: It is self contained and does not involve any significant architecture or coding changes to the underlying application (business/logic or data layers).

In a typical Apache Spark deployment, the Spark context splits data sets into partitions. Each partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer

Deployment using Apache Spark with load balanced services

The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balance
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning (mapping) one or two nodes in the  cluster nodes to each partition.

Partitioning service nodes
The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.

Deployment using Apache Spark and hashed IP partitioning

The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).

 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
    val hashedCode = input.hashCode + currentTimeMillis
    val seed = (if (hashedCode < 0) -hashedCode 
         else hashedCode)
    val random = new scala.util.Random(seed)
    val serverHash = random.nextInt(serverAddresses.length)
    val targetIp = serverAddresses(serverHash)

    val url = s"http://${targetIp}:80/endpoint"
    val httpConnector = HttpConnector(url, timeout)
     // Execute request and return a response of type Output

The function, hashedEndPoint, executed within each partition, in invoked from the master

def process(
 notes: Dataset[Input],
 timeout: Int,
 serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
  input => 
     if (serverAddresses.nonEmpty) 
         hashedEndPoints(input, timeout, serviceAddresses)
         throw new IlegalStateException("error ...")

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.

Performance evaluation
Let's evaluate the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of worker notes. The following chart plots the average latency for a given NLP engine to process documents, with a variable number of nodes.

Hash partitioning does improve performance significantly in the case of a small number of nodes. However, it out-performs the traditional load balancing deployment by as much as 20% for larger number of nodes (or instances).

Scala 2.11.8
JDK 1.8
Apache Spark 2.3.2
AWS EMR 5.19

Introduction to Load Balancing
Spark Partitions and Partitioning