Wednesday, November 21, 2018

Integration of 3rd party service with Spark

Target audience: Beginner
Estimated reading time: 10'


Introduction
Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external or 3rd party remote services such as natural language processing, images classification, reporting or data cleansing. 
These remote services or micro-services are commonly accessed through a REST API and are deployable as clusters of nodes (or instances) to improve scalability and high availability. 
Load balancing solutions are known to address scalability challenges since the dawn of the internet.

Is there an alternative to load balancers for scaling remote web services?

This post describes the integration of 3rd party micro services deployed on AWS with the Apache Spark framework. We compare two approaches to integrate Spark workers to the 3rd party service nodes: 
  • Deploy a load balancer between Spark executors and remote service
  • Apply hash partitioning for which the IP of each  service instance is assigned to a given Spark partition.
Note: This post assumes the reader is somewhat familiar with load balancers and a rudimentary knowledge of the Apache Spark framework.


Load balancers
Load balancers are commonly used to route requests to web or micro-services according to a predefined policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of internet and web sites.
A load balancer is a very simple and easy solution to deploy micro services at scale: It is self contained and does not involve any significant architecture or coding changes to the underlying application (business/logic or data layers).

In a typical Apache Spark deployment, the Spark context splits data sets into partitions. Each partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer

Deployment using Apache Spark with load balanced services


The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balance
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning (mapping) one or two nodes in the  cluster nodes to each partition.


Partitioning service nodes
The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.

Deployment using Apache Spark and hashed IP partitioning


The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
    val hashedCode = input.hashCode + currentTimeMillis
    val seed = (if (hashedCode < 0) -hashedCode 
         else hashedCode)
    val random = new scala.util.Random(seed)
    val serverHash = random.nextInt(serverAddresses.length)
    val targetIp = serverAddresses(serverHash)

    val url = s"http://${targetIp}:80/endpoint"
    val httpConnector = HttpConnector(url, timeout)
     // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
 notes: Dataset[Input],
 timeout: Int,
 serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
 inputs.map(
  input => 
     if (serverAddresses.nonEmpty) 
         hashedEndPoints(input, timeout, serviceAddresses)
     else
         throw new IlegalStateException("error ...")
   )
}

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.


Performance evaluation
Let's evaluate the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of worker notes. The following chart plots the average latency for a given NLP engine to process documents, with a variable number of nodes.

Hash partitioning does improve performance significantly in the case of a small number of nodes. However, it out-performs the traditional load balancing deployment by as much as 20% for larger number of nodes (or instances).

Environment
Scala 2.11.8
JDK 1.8
Apache Spark 2.3.2
AWS EMR 5.19

References

Wednesday, October 31, 2018

K-means Clustering in Java II: Classification

Target audience: Intermediate
Estimated reading time: 20'


Overview
The basic components of the implementation of K-means clustering algorithms has been introduced in the previous post K-means clustering in Java: Elements

This second part on the implementation of K-means in Java describes the two main tasks of any unsupervised or supervised learning algorithms:
  • training: executed off-line during analysis of historical data
  • classification: executed at run-time to classify new obsdervations
Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.


Training
The learning method, train, implements the clustering algorithm. It iterates to minimize the sum of distances between all cluster data points & its centroid.
For each iteration (or epoch) the train method:
  1. assign observations to each cluster
  2. compute the centroid for each cluster, computeCentroid
  3. compute the total distance of all the observations with their respective centroid computeTotalDistance
  4. estimate the closest cluster for each observation
  5. re-assign the observation, updateCentroids
public int train() {
  int numIter = _maxIters, k = 0
  boolean inProgress = true;
   
  initialize();  
  while(inProgress) {
     for(KmeansCluster cluster : _clusters ) {
        cluster.attach(_obsList[k]);
        if( ++k >= _obsList.length) {
           inProgress = false;
           break;
        }
     }
  }
               
  computeTotalDistance();
  for(KmeansCluster cluster : _clusters ) {
     cluster.computeCentroid();
  }           
  computeTotalDistance();
  
  List<Observation> obsList = null; 
  KmeansCluster closestCluster = null;

   // main iterative method, that traverses all the clusters
   // computes the distance of observations relative to their centroid
   // and re-assign the observations.
  for(int i = 0; i < _maxIterations; i++) {
    for(KmeansCluster cluster : _clusters ) { 
      obsList = new ArrayList<Observation>();
      for( Observation point : cluster.getDataPointsList()) {
        obsList.add(point);
      }
   
      for( Observation point : obsList) {
        double minDistance = Double.MAX_VALUE, distance = 0.0;
        closestCluster = null;
     
        for(KmeansCluster cursor : _clusters ) {
          distance =  point.computeDistance(cursor.getCentroid());
         if( minDistance >  distance) {
            minDistance = distance;
            closestCluster = cursor;
         }
       }
       updateCentroids(point, cluster, closestCluster);
     }
   }
     // Simple convergence criteria              
   if( _convergeCounter >= _minNumConvergeIters ) {
     numIters= i;
     break;
   }
 }
 return numIters;
}



Classification
The classification of a new observations is simple and consists in evaluating the distance between the new data point and each centroid and selecting the cluster with the shortest distance. The classify method extract the index or label of the cluster that is the most suitable (closest in distance) to the new observation.

public int classify(double[] obs) {
  double bestScore = Double.MAX_VALUE, distance = 0.0;
  int clusterId = -1;
       
  for(int k = 0; k < _centroids.length; k++) {
     distance = _centroids[k].computeDistance(obs);
     if(distance < bestScore) {
        bestScore = distance;
        clusterId = k;
     }
  }
  return clusterId;
}

The code snippet below implements some of the supporting method to
- compute the loss function value (total distance) - initialize the centroid for each cluster - update the values of centroids.


private void computeTotalDistance() {
  float totalDistance = 0.0F;
     
  for(KmeansCluster cluster : _clusters ) {
     totalDistance += cluster.getSumDistances();
  }
  
  double error = Math.abs(_totalDistance - totalDistance);
  convergeCounter = ( error < _convergeCriteria) ? convergeCounter +1 : 0;      
  _totalDistance = totalDistance;
}

private void initialize() {
   double[] params = getParameters();
   int numVariables = params.length>>1
      
   double[] range = new double[numVariables];
   for( int k = 0, j = numVariables; k <numVariables; k++, j++ ) {
      range[k] = params[k] - params[j];
   }
        
   double[] x = new double[numVariables];
   int sz_1 = _clusters.length+1,  m = 1;
      
   for(KmeansCluster cluster : _clusters) {
      for( int k = 0, j = numVariables; k <numVariables; k++, j++ ) {
         x[k] = ((range[k]/sz_1)*m) + params[j];
      }
      cluster.setCentroid(x);
      m++;
   }
}
 
private void updateCentroids(Observation point,
                             KmeansCluster cluster, 
                             KmeansCluster bestCluster) {
  boolean update = bestCluster != null && bestCluster != cluster; 
  if( update ) {
     bestCluster.attach(point);
     cluster.detach(point);
     for(KmeansCluster cursor : _clusters ) {
        cursor.computeCentroid();
     }
     computeTotalDistance();
  }
}



References
  • The Elements of Statistical Learning   - T. Hastie, R.Tibshirani, J. Friedman  - Springer 2001
  • Machine Learning: A Probabilisitc Perspective 11.4.2.5 K-means algorithm - K. Murphy - MIT Press 2012
  • Pattern Recognition and Machine Learning: Chap 9 "Mixture Models and EM: K-means Clustering" C.Bishop - Springer Science 2006 

Wednesday, October 10, 2018

K-means Clustering in Java I: Elements

Target audience: Advanced
Estimated reading time: 30'

Overview

Among the clustering methods have been developed over the years from Spectral clustering, Non-negative Matrix factorization, Canopy to Hierarchical and K-means clustering. The K-means algorithm is by far the easiest to implement. This simplicity comes with a high price in terms of scalability and even reliability. However, as a unsupervised learning technique, K-means is still valuable for reducing the number of model features or detecting anomalies.
The objective is to classify observations or data points by groups that share common attributes or features. The diagram below illustrates the clustering of observations (x,y) for a simple 2-feature model



Each cluster has a mean or centroid, m = ( .. m..). First we need to define a distance between an observation  X = (...x ..) and c. The Manhattan and Euclidean distances are respectively defined as:  \[d_{M} = \sum_{i=0}^{n}\left | x_{i} - m_{i}\right |\,\,\,,\,\,d_{E}= \sum_{i=0}^{n} (x_{i} - m_{i})^{2}\] The loss function for N cluster Cj is defined by \[W(C)=\frac{1}{2}\sum_{k=0}^{N-1}\sum_{c_{i}=k}\sum_{C_{j}} d(x_{i},x_{j})\]  The goal is to find the centroid m, and clusters C, that minimize the loss function as: \[C^{*}\left (i \right ) = arg\min _{k\in [0,N-1]}d (x_{i}, m_{k})\]

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.


Distances and Observations
First we need to define the distance between each observation and the centroid of a cluster. The class hierarchy related to the distance can be implemented as nested classes as there is no reason to "expose" to client code. The interface, Distance,define the signature of the computation method. For sake of simplicity, the sample code implements only the Manhattan and Euclidean distances.  Exceptions, validation of method arguments, setter and getter methods are omitted for the sake of simplicity.

protected interface Distance {
  public double compute(double[] x, Centroid centroid);
}
    // Defintion of d(x,y) =|x-y|
protected class ManhattanDistance implements Distance {
  public double compute(double[] x, Centroid centroid) {
     double sum = 0.0, xx = 0.0;
     for( int k = 0; k< x.length; k++) {
       xx = x[k] - centroid.get(k);
       if( xx < 0.0) {
         xx = -xx;
       }
       sum += xx;
     }
     return sum;
  }
}

  // Definition d(x,y)= sqr(x-y) 
protected class EuclideanDistance implements Distance {
  public double compute(double[] x, Centroid centroid) {
    double sum = 0.0, xx = 0.0;
    for( int k = 0; k < x.length; k++) {
       xx = x[k] - centroid.get(k);
       sum += xx*xx;
    } 
    return Math.sqrt(sum);
  } 
}

Next, we define an observation (or data point) as a vector or array of floating point values, in our example.  An observation can support heterogeneous types (boolean, integer, float point,..) as long as they are normalized to [0,1]. In our example we simply normalized over the maximum values for all the observations.

public final class Observation {
   // use Euclidean distance that is shared between all the instances

  private static Distance metric = new EuclideanDistance();
  public static void setMetric(final Distance metric) {
    this.metric = metric;
  }
 
  private double[] _x  = null;
  private int  _index  = -1;

  public Observation(double[] x, int index) { 
    _x = x; 
    _index = index; 
  }
   // compute distance between each point and the centroid
  public double computeDistance(final Centroid centroid) {
     return metric.compute(_x, centroid);
  }
    // normalize the value of data points.
  public void normalize(double[] maxValues) {
     for( int k = 0; k < _x.length; k++) {
        _x[k] /= maxValues[k];
     }
  }
}



Clustering
Centroid for each cluster are computed iteratively to reduce the loss function.  The centroid values are computed using the mean of each feature across all the observations. The method Centroid.compute initialize the data points belonging to a cluster with the list of observations and compute the centroid values _x by normalizing with the number of points. 

protected class Centroid {
  private double[] _x = null;       
       
  protected Centroid() {}
  protected Centroid(double[] x) {
    Array.systemCopy(_x, x, 0, x.length, sizeOf(double));
  }
    // Compute the centoid values _x by normalizing with the number of points.
  protected void compute(final List<Observation> observations)  {
    double[] x = new double[_x.length];
    Arrays.fill(x, 0.0);
           
    for( Observation point : observations ) {
      for(int k =0; k < x.length; k++) {
        x[k] += point.get(k);
      }
    }
    int numPoints = observations.size();
    for(int k =0; k < x.length; k++) {
      _x[k] = x[k]/numPoints;
    }
  }
}

A cluster is defined by its label (index in this example) a centroid, the list of observations it contains and the current loss associated to the cluster (sum of the distance between all observations and the centroid).
The cluster behavior is defined by the following methods:
  • computeCentroid: compute the sum of the distance between all the point in this cluster and the centroid.
  • attach: Attach or add a new observation to this cluster
  • detach: Remove an existing observations from this cluster.
public final class KmeansCluster {
  private int       _index   = -1;
  private Centroid  _centroid  = null; 
  private double    _sumDistances  = 0.0;
  private List<observation> _observations = new ArrayList<Observation>()

  public void computeCentroid() {
    _centroid.compute( _observations );
    for( Observation point : _observations  ) {
        point.computeDistance(_centroid);
    }
    computeSumDistances();
  }

     // Attach a new observation to this cluster.
  public void attach(final Observation point) { 
    point.computeDistance(_centroid);
    _observations.add(point);
    computeSumDistances();
  }

  public void detach(final Observation point) {
    _observations.remove(point);
    computeSumDistances();
  }
           
  private void computeSumDistances() { 
    _sumDistances = 0.0;     
    for( Observation point : _observations) {
      _sumDistances += point.computeDistance(_centroid);
    }
  }
      //....
}

Finally, the clustering class implements the training and run-time classification. The train method iterates across all the clusters and for all the observations to reassign the observations to each cluster. The iterative computation ends when either the loss value converges or the maximum number of iterations is reached. 

If the algorithm use K clusters with M observations with N variables the execution time for creating the clusters is K*M*N. If the algorithm converges after T iterations then the overall execution is T*K*M*N. For instance, the K-means classification of 20K observations and data with 25 dimension, using 10 clusters, converging after 50 iterations requires  250,000,000 evaluations! The constructor create the clustering algorithm with a predefined number of cluster, K, and a set of observations.
The method getCentroids retrieves the current list of centroids (value of centroid vectors)

public final class KmeansClustering { 
  private KmeansCluster[] _clusters = null;
  private Observation[] _obsList = null; 
  private double _totalDistance  = 0.0;
  private Centroid[] _centroids = null;
   
  public KmeansClustering(int numClusters, final Observation[] obsList) {   
     _clusters = new KmeansCluster[numClusters];
     for (int i = 0; i < numClusters; i++) {
        _clusters[i] = new KmeansCluster(i);
     }
     _obsList = obsList;
  }
 
  public final List<double[]> getCentroids() {
      List<double[]> centroidDataList = null;

      if(_clusters != null &&; _clusters.length < 0) {
         centroidDataList = new LinkedList<double[]>();
         for( KmeansCluster cluster : _clusters) {
            centroidDataList.add(cluster.getCentroid().getX());
         }
      }
      return centroidDataList;
  }
}

The second section (next post) describes the implementation of the training and classification tasks.


References
  • The Elements of Statistical Learning   - T. Hastie, R.Tibshirani, J. Friedman  - Springer 2001
  • Machine Learning: A Probabilisitc Perspective 11.4.2.5 K-means algorithm - K. Murphy - MIT Press 2012
  • Pattern Recognition and Machine Learning: Chap 9 "Mixture Models and EM: K-means Clustering" C.Bishop - Springer Science 2006 

Saturday, September 8, 2018

Scala Implicit Classes to Extend Libraries

Target audience: Beginner
Estimated reading time: 10'

This post explores the concept and implementation of Scala implicit classes and their application to the extension of existing Scala standard libraries.


Overview
Implicit methods are quite useful in defining global type conversion (as long as the semantic is clearly understood). But what about implicit classes?
Implicit classes can be used to extend existing Scala standard library classes. Most of Scala classes are declared final or implement a sealed trait. Composition is a viable alternative to Inheritance in term of re-usability: the class to extend is wrapped into a helper or utility class. However, a helper/container class adds an indirection and "pollute" the name space.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.
Let's look at an example of extension of standard library.


Example
The use case consists of extending the Try class, scala.util.Try with a Either semantic: Execute a function if the code throws an exception, and another function if the computation succeeds. Such simple design allows computation flows to be routed depending on unexpected state.

The main construct is the declaration of a implicit class that wraps Try. A recovery function rec is called if an exception is thrown, a computation f is performed otherwise.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import scala.util._
 
implicit class Try2Either[T](_try: Try[T]) {
    
  def toEither[U](rec: ()=>U)(f: T=>T): Either[U,T] = 
   _try match {
     case Success(res) => Right(f(res))
     case Failure(e) => 
       println(e.toString)
       Left(rec())  
   }
}

You may notice that the method toEither is curried to segregate the two parameterized type T and U. It also comply with the semantic of Either with Left element for error (and recovery) and Right element dedicated to the successful outcome.

Let's take the example of the normalization of a large array of floating point values by their sum. A small value will generate a rounding error during the normalization and an exception is thrown. However we do not want to burden the client code with handling the exception (the client method may not know the context of the exception after all). In this example the recovery function rec instantiates the class Recover which is responsible for a retry, potentially from a different state.


Implicit classes have an important limitation in terms of re-usability. You cannot override a default method without having to sub-class the original Scala standard library class, which is not an option in our example because Try is a sealed abstract class.
As with implicit methods, the name of the class is never actually used in the code but need to reflect the intention of the developer. Let's apply this implicit class

type DVector = Array[Double]

  // Arbitrary recovery class
class Recover {
  def reTry(x: DVector): DVector  = Array[Double](0.0)
}
  
  // Normalization of a vector. Proceeds to the
  // next computation (log) if the normalization succeeds
  // or invoke the recovery procedure otherwise
def normalize(x: DVector ): Either[Recover, DVector] = Try {
  val sum = x.sum 

  if(sum < 0.01) 
    throw new IllegalStateException(s"sum $sum")
  x.map( _ / sum) 
}
.toEither(() => new Recover)(
  (v: DVector) => v.map( Math.log(_)
)

The implementation is very elegant. There is no need for new semantic and naming convention and the return type Either is very well understood in the Scala development community.


References