Wednesday, November 21, 2018

Integration of 3rd party service with Spark

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external (3rd party) remote services such as NLP processing, deep learning model training or images classification. Moreover, these services, accessible through a REST API for instance may not be easily re-configurable.
A remote service is usually deployed as clusters of nodes (or instances) to improve scalability and guarantee high availability. The question becomes, what is the most efficient approach to generate and process requests to a cluster of services?

This post evaluates the performance of the execution of a remote service deployed on multiple nodes as part of a Apache Spark application. We compare two approaches to integrate Spark workers to the 3rd party service nodes using 

  • a load balancer
  • map to assign service nodes to each partition

Load balancer
Load balancers are commonly used for routing request to web services according to a policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of web servers.
A load balancer is a very simple and easily deployable solution that is self contained and  does not involve any architecture or coding changes.


In a typical Apache Spark deployment, the context splits data sets into partitions. Each  partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer



The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balance
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning directly the service cluster nodes to each partition.


Partitioning service nodes
The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.



The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
    val hashedCode = input.hashCode + currentTimeMillis
    val seed = (if (hashedCode < 0) -hashedCode 
         else hashedCode)
    val random = new scala.util.Random(seed)
    val serverHash = random.nextInt(serverAddresses.length)
    val targetIp = serverAddresses(serverHash)

    val url = s"http://${targetIp}:80/endpoint"
    val httpConnector = HttpConnector(url, timeout)
     // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
 notes: Dataset[Input],
 timeout: Int,
 serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
 inputs.map(
  input => 
     if (serverAddresses.nonEmpty) 
         hashedEndPoints(input, timeout, serviceAddresses)
     else
         throw new IlegalStateException("error ...")
   )
}

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.

Performance evaluation
Let's measure the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of nodes or instances. The following chart plots the average duration the NLP processes a document with different number of NLP processing nodes.



It is clear that although the performance improvement is not striking for a small number of service nodes, the direct assignment of the NLP processing nodes to Spark partitions reduces latency by as much as 20% for larger number of nodes (or instances).


Environment
Scala 2.12.1
JDK 1.8
Apache Spark 2.3.2
AWS EMR 5.19


References
Introduction to Load Balancing
Spark Partitions and Partitioning

Saturday, July 7, 2018

Covariant and Contravariant Functors in Scala

Scala is a first class functional programming language which supports among other FP concepts, higher-kind types, functors and monads. This post illustrates the capability of Scale to leverage covariant and contravariant functors for tensor analysis, and implementing vectors and co-vectors.

Overview

Most of Scala developers have some experience with the core tenets of functional programming: monads, functors and applicatives. Those concepts are not specific to Scala or even functional programming at large. There are elements of a field in Mathematics known as topology or algebraic topology.

Differential geometry or differential topology makes heavy use of tensors that leverage covariant and contravariant functors.
This post introduces the concepts of

  • Contravariant functors applied to co-vectors and differential forms
  • Projection of higher kind

Vector fields 101
Let's consider a 3 dimension Euclidean space with basis vector {ei} and a vector field V (f1, f2, f3) [Note: we follow Einstein tensor indices convention]

The vector field at the point P(x,y,z) as the tuple (f1(x,y,z), f2(x,y,z), f3(x,y,z)).
The vector over a field of k dimension field can be formally. mathematically defined as
\[f: \boldsymbol{x} \,\, \epsilon \,\,\, \mathbb{R}^{k} \mapsto \mathbb{R} \\ f(\mathbf{x})=\sum_{i=1}^{n}{f^{i}}(\mathbf{x}).\mathbf{e}^{i}\] Example: \[f(x,y,z) = 2x+z^{3}\boldsymbol{\mathbf{\overrightarrow{i}}} + xy+e^{-y}-z^{2}\boldsymbol{\mathbf{\overrightarrow{j}}} + \frac{x^{3}}{y}\boldsymbol{\mathbf{\overrightarrow{k}}}\]
Now, let's consider the same vector V with a second reference (origin O' and basis vector e'i

\[f(\mathbf{x})=\sum_{i=1}^{n}{f'_{i}}(\mathbf{x}).\mathbf{e'}_{i}\]
The transformation matrix Sij convert the coordinates value functions fi and f'i. The tuple f =(fi) or more accurately defined as (fi) is the co-vector field for the vector field V
\[S_{ij}: \begin{Vmatrix} f^{1} \\ f^{2} \\ f^{3} \end{Vmatrix} \mapsto \begin{Vmatrix} f'^{1} \\ f'^{2} \\ f'^{3} \end{Vmatrix}\]
The scalar product of the co-vector f' and vector v(f) defined as is defined as
\[< f',v> = \sum f'_{i}.f^{i}\]
Given the scalar product we can define the co-vector field f' as a linear map
\[\alpha (v) = < f',v> (1) \]
Covariant functors

I assume the reader has basic understanding of Functor and Monads. Here is short overview:

A category C is composed of object x and morphism f defined as
\[C= \{ {x_{i}, f \in C | f: x_{i} \mapsto x_{j}} \}\] A functor F is a map between two categories C and D that preserves the mapping.
\[x\in C \Rightarrow F(x)\in D \\ x, y\in C \,\,\, F: x \mapsto y => F(x)\mapsto F(y)\]
Let's look at the definition of a functor in Scala with the "preserving" mapping method, map

1
2
3
trait Functor[M[_]] {
  def map[U, V](m: M[U])(f: U => V): M[V]
}

Let's define the functor for a vector (or tensor) field. A vector field is defined as a sequence or list of fields (i.e. values or function values).

type VField[U] = List[U]

trait VField_Ftor extends Functor[VField] {
  override def map[U, V](vu: VField[U])(f: U => V): VField[V] = vu.map(f) 
}

This particular implementation relies on the fact that List is a category with its own functor. The next step is to define the implicit class conversion VField[U] => Functor[VField[U]] so the map method is automatically invoked for each VField instance.


implicit class vField2Functor[U](vu: VField[U]) 
    extends VField_Ftor {  
    
  final def map[V](f: U => V): VField[V] = 
     super.map(vu)(f) 
}

By default Covariant Functors (which preserve mapping) are known simply as Functors. Let's look at the case of Covector fields.

Contravariant functors

A Contravariant functor is a map between two categories that reverses the mapping of morphisms.
\[x, y\in C \,\,\, F: x \mapsto y => F(y)\mapsto F(x)\]


trait CoFunctor[M[_]] {
  def map[U, V](m: M[U])(f: V => U): M[V]
}

The map method of the Cofunctor implements the relation M[V->U] => M[U]->M[V]
Let's implement a co-vector field using a contravariant functor. The definition (1) describes a linear map between a vector V over a field X to the scalar product V*: V => T.
A morphism on the category V* consists of a morphism of V => T or V => _ where V is a vector field and T or _ is a scalar function value.


type CoField[V, T] = Function1[V, T]


The co-vector field type, CoField is parameterized on the vector field type V which is a input or function parameter. Therefore the functor has to be contravariant.

The higher kind type M[_] takes a single type as parameter (i.e. M[V]) but a co-vector field requires two types:
  • V: Vector field
  • T: The scalar function is that the result of the inner product <.>

Fortunately the contravariant functor CoField_Ftor associated with the co-vector needs to be parameterized only with the vector field V. The solution is to pre-defined (or 'fix') the scalar type T using a higher kind projector for the type L[V] => CoField[V, T]
T => ({type L[X] = CoField[X,T]})#L


trait CoField_Ftor[T] 
  extends CoFunctor[({type L[X] = CoField[X,T]})#L ] {

  override def map[U,V](
    vu: CoField[U,T]
  )(f: V => U): CoField[V,T] = 
   (v: V) => vu(f(v))
}

As you can see the morphism over the type V on the category CoField is defined as f: V => U instead of f: U => V. A kind parameterized on the return type (Function1) would require the 'default' (covariant) functor. Once again, we define an implicit class to convert a co-vector field, of type CoField to its functor, CoField2Ftor


implicit class CoField2Ftor[U,T](vu: CoField[U,T]) 
    extends CoField_Ftor[T] {
  
  final def map[V](f: V => U): CoField[V,T] = 
     super.map(vu)(f) 
}

Evaluation
Let's consider a field of function values FuncD of two dimension: v(x,y) = f1(x,y).i + f2(x,y.j. The Vector field VField is defined as a list of two function values.


type DVector = Array[Double]
type FuncD = Function1[DVector, Double]
type VFieldD = VField[FuncD]

The vector is computed by assigning a vector field to a specific point (P(1.5, 2.0). The functor is applied to the vector field, vField to generate a new vector field vField2


val f1: FuncD = new FuncD((x: DVector) => x(0)*x(1))
val f2: FuncD = new FuncD((x: DVector) => -2.0*x(1)*x(1))
  
val vfield: VFieldD = List[FuncD](f1, f2)
val value: DVector = Array[Double](1.5, 2.0)  
val vField2: VFieldD = vfield.map( _*(4.0))


A co-vector field, coField is computed as the sum of the fields (function values) (lines 1, 2). Next, we compute the product of co-vector field and vector field (scalar field product) (line 6). We simply apply the co-vector Cofield (linear map) to the vector field. Once defined, the morphism _morphism is used to generate a new co-vector field coField2 through the contravariant function CoField2Functor.map(line 10).

Finally a newProduction is generated by composing the original covariant field Cofield and the linear map coField2 (line 12).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val coField: CoField[VFieldD, FuncD] = 
    (vf: VFieldD) => vf(0) + vf(1)

val contraFtor: CoField2Functor[VFieldD, FuncD] = coField
  
val product = coField(vField)
val _morphism: VFieldD => VFieldD = 
   (vf: VFieldD) => vf.map( _*(3.0))

val coField2 = contraFtor.map( _morphism )
   
val newProduct: FuncD = coField2(coField)


Environment
Scala 2.11.8
JDK 1.8

References

  • Tensor Analysis on Manifolds - R. Bishop, S. Goldberg - Dover publication 1980
  • Differential Geometric Structures - W. Poor - Dover publications 2007
  • Functors and Natural Transformationsv- A. Tarlecki - Category Theory 2014

Wednesday, May 2, 2018

Multivariate Kernel Density Estimation in Spark

Kernel Density Estimation (KDE) is a very powerful, non-parametric method to extract a empirical continuous probability density function from a dataset. At its core the KDE is a smooth approximation of an histogram. 
For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1, org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or dataset, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ...


  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity()
    .setSample(sample)
    .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver.

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions.
We assume
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size.
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29).
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
      val seqObs = iter.toArray
      val scale = 0.5 * seqObs.size* log(2 * Pi)
      val validation = validation_brdcast.value

      val (densities, count) = seqObs.aggregate(
        (new Array[Double](validation.length), 0L) ) (
        {        // seqOp (U, T) => U
          case ((x, z), y) => {
            var i = 0
            while (i < validation.length) {   
               // Call the pdf function for the normal distribution
              x(i) += multiNorm(y, bandwidth, scale, validation(i))
              i += 1
            }
            (x, z + 1)// Update  count & validation values
          }
        },
        {         // combOp: (U, U) => U
          case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
            blas.daxpy(validation.length, 1.0, v, 1, u, 1)
            (u, z + t)
          }
        }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
  import math._
  type Obs = Array[Double]

  @throws(classOf[IllegalArgumentException])
  def multiNorm(
      means: Obs, 
      bandWidth: Double, 
      scale: Double, 
      x: Obs): Double = {
    require(x.length == means.length, 
        "Dimension of means and observations differs")

    exp(
      -scale - (0 until means.length).map(n => {
        val sx = (means(n) - x(n)) / bandWidth
        -0.5 * sx * sx
      }).sum
    )
  }
}

Driver
This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
   ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
   ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
   ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
   ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
   ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity()
    .setSample(sample)
    .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))

Environment
Scala: 2.11.8
Java JDK 1.8
Apache Spark 2.3.1
OpenBLAS 0.3.4

References