Tuesday, January 19, 2021

Lazy instantiation of dataset from Amazon S3

How many times have you thought it would be great to able to instantiate an object or a data set stored on Amazon S3 on demand? It is easier than you think using Scala and Spark functional tool box.

The lazy instantiation of dataset from S3 has been originally developed using Scala 2.12 and Apache Spark 2.4. The code has no specific dependency on any given version of the language or framework and will compile/run on Apache Spark 3.0

Just be lazy

A common requirement in machine learning is to load the configuration parameters associated with model at run-time. The model may have been trained with data segregated by customers, or categories. When deployed for prediction, it is critical to select/load the right set of parameters according to the characteristic of the request

For instance, a topic extraction model may have been trained with scientific corpus, medical articles or computer science papers. 

A simple approach is to pre-load all variants of a model when the underlying application is deployed in production. However, consuming uncessary memory and CPU cycle for a model that may be needed, at least right away, is a waste of resource. In this post, we assume that the model parameters are stored on Amazon S3.

 Lazy instantiation of objects allows us to reduce unnecessary memory consumption by invoking a constructor once, only when needed. This capability becomes critical for data with large footprint such as Apache Spark data sets.

A simple, efficient repository

Let's consider all the credentials to access multiple devices consisting of an id, password and hint that have been previously uploaded on S3. 

case class Credentials(device: String, id: String, password: String, hint: String)

A hash table is the simplest incarnation of a dynamic repository of models. Therefore we implement a lazy hash table by sub-classing the mutable HashMap.
The first time a model is requested, it is loaded into memory from S3 that returned to the client code. To this purpose we need to define the following argument for the constructor of the lazy hash table
  • Dynamic loading mechanism from S3 - loader is responsible for loading the data from S3
  • Key generator - toKey converts a string key to a the type of key of the Hash map

final class LazyHashMap[T, U](
    loader: String => Option[U], 
    toKey: String => T
) extends HashMap[T, U] {
   // Override the HashMap.get method 
   override def get(item: String): Option[U] = synchronized {
      val key = toKey(item)
      if(super.contains(key)) // Is is already in memory?
       loader(item).map(  // otherwise load the item from S3
         l => {
          super.put(key, l)
     // Prevent for updating this immutable map
   @throws(class = classOf[UnsupportedOperationException])
   override def put(key: T, value: U): Option[U] 
      throw new UnsupportedOperationException("lazy map is immutable")

The keyword synchronized implements a critical section to protect the execution from dirty read. 

Here is an example of the two arguments for the constructor of the lazy hash table for a type MyValue. The key identifies the data set and the model which has been trained on.

val load: String => Option[Credentials] = 
     (dataSource: String) => loadData(dataSource)
val key = (s: String) => s

val lazyHashMap = new LazyHashMap[String, Dataset[Credentials]](load, key)

The last business to take care of is the implementation of the function, loadData to load and instantiate the dataset

Data loader

Let's write a loader for a Spark data set of type T stored on AWS S3 in a given bucket, bucketName and folder, s3InputPath

def s3ToDataset[T](
   s3InputPath: String
)(implicit encoder: Encoder[T]): Dataset[T] = {
  import sparkSession.implicits._

   // Needed for access keys and infer schema
  val loadDS = Seq[T]().toDS
  val accessConfig = loadDS.sparkSession.sparkContext.hadoopConfiguration

   // Credentials to read from S3
accessConfig.set("fs.s3a.access.key", myAccessKey) accessConfig.set("fs.s3a.secret.key", mySecretKey) try {
     // Enforce the schema
     val inputSchema = loadDS.schema
	 .load(path = s"s3a://$bucketName/${s3InputPath}")
  catch {
    case e: FileNotFoundException => log.error(e.getMessage)
    case e: SparkException => log.error(e.getMessage)
    case e: IOException =>  log.error(e.getMessage)

It is assumed that the Apache Spark session has already been created and an encoder (i.e. Kryo) has been already been defined. The encoder for the type T is implicitly defined, usually along with the Spark session.
The first step is to instantiate a 'dummy' empty dataset of type T. The instantiation, loadDS is used to
  • Access the hadoop configuration to specify the credentials for S3
  • Enforce the schema when reading the data (in JSON) format from S3. Alternatively, the schema could have been inferred.
Note Data from S3 bucket is accessed through the s3a:// protocol. It add an object layer on top of the default S3 protocol which is block-centric. It is significantly faster.

Finally let's implement the load function, loadData

  // Create a simple Spark session
implicit val sparkSession =  SparkSession.builder

def loadData(s3Path: String): Option[Dataset[Credentials]] = {
  import sparkSession.implicits._ // need for encoding


Sunday, October 20, 2019

Contextual Thompson sampling 1: Theory

This post introduces the concept of multi-armed bandit (a.k.a. K-armed bandit) and the Thompson sampling for contextual bandit. The next post deals with the actual implementation of the contextual Thompson sampling in Apache Spark.

Multi-arm Bandit problem

The multi-armed bandit problem is a well-known reinforcement learning technique,  widely used for its simplicity. Let's consider a  slot machine with n arms (bandits) with each arm having its own biased probability distribution of success. In its simplest form pulling any one of the arms gives you a  reward of either 1 for success, or 0 for failure. Our objective is to pull the arms one-by-one in sequence such that we maximize our total reward collected in the long run.
Dual-arm bandit

Data scientists have developed several solutions to tackle the multi-armed bandit problem for which the 3 most common algorithms are
  • Epsilon-Greedy
  • Upper confidence bounds
  • Thompson sampling


The Epsilon-greedy is the simplest algorithm to address the  exploration-exploitation trade-off.  During exploitation, the lever with highest known payout is always pulled. However, from time to time (fraction ε with  ε < 0.1) the algorithm selects a random arm to 'explore' other arms which payout is relatively unknown. The arms with known or proven highest payout are pulled (1-ε of the time)

Upper Confidence Bound (UCB)

This solution is sometimes referred as Optimism in the Face of Uncertainty principle: It assumes that the unknown mean payoffs of each arm will be as high as possible, based on historical data.

Thompson Sampling

The Thompson sampling algorithm is fundamentally a Bayesian optimization technique which core principle known as probability matching strategy can be summed as ‘play an arm according to its probability of being the best arm’.(i.e. the number of pulls for a given lever should match its actual probability of being the optimal lever)

The following diagram illustrates the difference between the upper confidence bounds and the Thompson sampling.

Confidence Bounds for Multi-Armed bandit problem

Context anyone?

The techniques described above do not make any assumption on the environment or the agent who selects the arms. There are two scenario:

  • Context-free bandit: The selection of the arm with the highest rewards depends solely on the past history (prior) reward (success and failure). Such history can be modeled using a Bernoulli distribution. For instance, the probability to get a '6' playing dice does not depend on the player.
  • Contextual bandit: The state of the environment (a.k.a. context) is used as an input (or model) to the selection of the arm with the highest predicted reward. The algorithm observes a new context (state), choose one action (arm), and observes an outcome (reward). In ad targeting, the selection of a banner or insert to be displayed on a web site depends on the prior rewards history associated to the demographic data of the user.
Any of the techniques applied to the multi-armed bandit can be used with and without context. The following sections focus on the contextual multi-arm bandit problem.

Contextual Thompson sampling (CTS)

Let's dive into the key characteristics of the Thompson sampling
  • We assume the prior distribution on the parameters of the distribution (unknown) of the reward for each arm.
  • At each step, t, the arm is selected according to the posterior probability to be the optimal arm. 
The components of the contextual Thompson sampling are
1. Model of parameters w
2. A prior distribution p(w) of the model
3. History H consisting of a context x and reward r
4. Likelihood or probability p(r|x, w) of a reward given a context x and parameters w
5. Posterior distribution computed using naïve Bayes\[p(\widetilde{w}|H)=p(H|\widetilde{w}).p(\widetilde{w})/p(H))\]

But how can we model a context?

Actually, a process to select the most rewarding arm is actually a predictor or a learner. Each predictor takes the context, defines as a vector of features and predicts which arm will generate the highest reward.

The predictor is a model that can be defined as
- Linear model
- Generalized linear model (i.e. logistic)
- Neural network

The algorithm is implemented as a linear model (weights w) for estimating the reward from a context x  as \[w^{T}.x\]
The ultimate objective for any reinforcement learning model is to extract a policy which quantifies the behavior of the agent. Given a set X of context xi and a set A of arms, the policy is defined by the selection of an arm given a context x
\[\pi : X\rightarrow A\]

Contextual Thompson Sampling Algorithm

The sampling of the normal distribution (line 3) is described in details in the post Multivariate Normal Sampling with Scala. The algorithm computes the maximum reward estimation through sampling the normal distribution (line 4) and play the arm a* associated to it (line 5).
The parameters V and b are updated with the estimated value (line 7 and 8) and the actual reward is computed (line10) after the weights of the context are updated (line 9)

The next post describes the implementation of the contextual Thompson sampling using Scala, Nd4j and Apache Spark.


Solving the multi armed bandit problem
Introduction context free Thompson sampling
Thompson Sampling for Contextual Bandits with Linear Payoffs

Sunday, June 9, 2019

Multivariate Normal sampling with Scala and ND4j

This post describes the implementation of the multivariate normal sampling using ND4j. 

The multi-variate normal sampling function is used in various machine learning techniques such as Markov chains Monte Carlo (MCMC) or contextual multi-armed bandit.
The implementation of the multivariate normal relies on data vectorization, technique well-known to Python developers and data scientists alike.

Note: The post requires some knowledge of data vectorization (numpy, datavec, ND4j..) as well as Scala programming language.

Python Numpy is a well-known and reliable vectorized linear algebra library which is a foundation of scientific (SciPy) and machine learning (Sciktlearn) libraries. No serious data scientific projects can reasonably succeeds with the power and efficiency of numpy. 
The vectorization of datasets is the main reason behind the performance of machine learning models (training and prediction) build in Python.

Is there a similar linear algebra library, supporting vectorization, available to Scala and Spark developers? Yes, ND4j

ND4j library replicates the functionality of numpy for Java developers. ND4j can be downloaded as an independent library or as component of the deep learning library, deeplearning4j. It leverages the BLAS and LAPACK libraries.
From a Java developer perspective, the data represented by an NDArray is stored outside of the Java Virtual Machine. This design has the following benefits:
  • Avoid taxing the Garbage Collector
  • Interoperability with high-performance BLAS libraries such as OpenBlas
  • Allow number of array elements exceeds Int.MaxValue
The BLAS (Basic Linear Algebra Subprograms) are functions performing basic vector and matrix operations. The library is divided in 3 levels:
  • Level 1 BLAS perform scalar, vector and vector-vector operations,
  • Level 2 BLAS perform matrix-vector operations
  • Level 3 BLAS perform matrix-matrix operations. 
OpenBLAS is an optimized Basic Linear Algebra Subprograms (BLAS) library based on GotoBLAS2,  a C-library of linear algebra supporting a large variety of micro-processor. Its usage is governed by the BSD license.
LAPACK are Fortran routines for solving systems of simultaneous linear equations, least-squares solutions of linear systems of equations, eigenvalue problems, and singular value problems and matrix factorizations.

Implicit ND4j array conversion
The first step is to create a implicit conversation between ND4j and Scala data types.  
The following code convert an array of double into a INDArray using org.nd4j.linalg.factory.Nd4j java class and its constructor create(double[] values, int[] shape)
  • In case of a vector, the shape is defined in scala as  Array[Int](size_vector)
  • In case of a matrix, the shape is defined as Array[Int](numRows, numCols). 
The following snippet implement a very simple conversion from a Scala array to a INDArray

@throws(clazz = classOf[IllegalArgumentException])
implicit def double2NDArray(values: Array[Double]): INDArray = {
  require(values.nonEmpty, "ERROR: ND4, conversion ...")

  Nd4j.create(values, Array[Int](1, values.length))

Multivariate Normal distribution implementation
The sampling of a multivariate normal distribution is defined by the formula 
\[\mathbb{N}(\mu, \Sigma )=\frac{1}{\sqrt{(2\pi)^n \left | \Sigma \right |}} e^{\frac{1}{2}(x-\mu)^T {\Sigma '}^{-1}(x-\mu)}\] A generalized definition adds a very small random perturbation factor r |r| <= 1 on the variance value (diagonal of the covariance matrix) \[\Sigma ' = \Sigma + r.I\] Sigma is the covariance matrix and the mu is the mean value. 
The computation of the multivariate normal sampling can be approximated by the Cholesky decomposition. In a nutshell, the Cholesky algorithm decompose a positive-definite matrix into a product of two matrix
  • lower triangle matrix
  • transposition of its conjugate
It serves the same purpose as the ubiquitous LU decomposition with less computation. \[\mathbb{N}(\mu, \Sigma) \sim \mu + Chol(\Sigma).Z^T\] where \[L=Chol(\Sigma)\] and \[L.L^T=\Sigma\]. The vector Z is a multivariate normal noise \[Z= \{ z_i | z_i=N(0, 1)\}\]
The following implementation relies on the direct invocation of LAPACK library function potrf. The LAPACK functionality are accessed through the BLAS wrapper getBlasWrapper.

 final def choleskyDecomposition(matrix: INDArray): INDArray = {
   val matrixDup = matrix.dup
   Nd4j.getBlasWrapper.lapack.potrf(matrixDup, false)

Note that the matrix is duplicated prior to the LAPACK function call as we do not want to alter the input matrix. 
Let's implement the multivariate Normal sampler with perturbation using the Cholesky decomposition.

@throws(clazz = classOf[IllegalArgumentException])
@throws(clazz = classOf[IllegalStateException])
final def multiVariateNormalSampling(
   mean: INDArray,
   cov: INDArray,
   perturbation: Double = 0.0): INDArray = {
 import scala.util.Random
 require(cov.size(0) == mean.length, s"Sigma shape ${cov.size(0)} should be mean size ${mean.length}")
 require(cov.size(1) == mean.length, s"Sigma shape ${cov.size(1)} should be ${mean.length}")

 try {
  // Add a perturbation epsilon value to the covariance matrix
  // if it is defined
  val perturbMatrix =
   if(perturbation > 0.0)
    cov.add( squareIdentityMatrix(cov.size(0), 

  // Apply the Cholesky decomposition
  val perturbed: INDArray = choleskyDecomposition(perturbMatrix)
   // Instantiate a normal distribution
  val normalPdf = new NormalDistribution(
       new DefaultRandom, 

  val normalDensity = Array.fill(mean.size(0))(normalPdf.sample)
  val normalNDArray: INDArray = normalDensity

   // This is an implementation of the Dot product
  val normalCov = perturbed.mmul(normalNDArray.transpose)
   // Add the normalized perturbed covariance to the mean value
 catch {
  case e: org.nd4j.linalg.api.blas.BlasException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: org.apache.commons.math3.exception.NotStrictlyPositiveException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: Throwable =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")

Let's look at the full implementation of the multi-variate normal sampling.
The first step validates the shape of the mean and covariance input matrices [line 8, 9]. As mentioned earlier, the generalized normal distribution introduces an optional random perturbation of small magnitude (~1e-4) [line 14-17] that is useful for application that requires some stochastic
The 'perturbed' covariance matrix is factorized using the Cholesky decomposition [line 22]. The normal probability density function (mean 0.0 and standard deviation 1.0) is used to generate random values [line 24-30] which is then applied to the covariance matrix [line 33].
The normal randomized variance is added to the vector of mean values [line 35].

For the sake of convenience, the multivariate normal density function uses the Apache Math common 3.5 API [line 24].

Scala 2.13.2
JDK 1.8
ND4j 1.0.0-beta3

Scala 2.13
Deep learning 4j
Cholesky decomposition
Approximation multivariate Normal sampling
Apache common math