Sunday, June 9, 2019

Multivariate Normal sampling with Scala and ND4j

This post describes the implementation of the multivariate normal sampling using ND4j. The multi-variate normal sampling function is used in various machine learning techniques such as Markov chains Monte Carlo (MCMC) or contextual multi-armed bandit.
The implementation of the multivariate normal relies on data vectorization, technique well-known to Python developers and data scientists alike, that we introduce in the first section.

Vectorization
Python Numpy is a well-known and reliable vectorized linear algebra library which is a foundation of scientific (SciPy) and machine learning (Sciktlearn) libraries. No serious data scientific projects can reasonably succeeds with the power and efficiency of numpy. 
The vectorization of datasets is the main reason behind the performance of machine learning models (training and prediction) build in Python.

Is there a similar linear algebra library, supporting vectorization, available to Scala and Spark developers? Yes, ND4j


ND4j, BLAS and LAPACK
ND4j library replicates the functionality of numpy for Java developers. ND4j can be downloaded as an independent library or as component of the deep learning library, deeplearning4j. It leverages the BLAS and LAPACK libraries.
From a Java developer perspective, the data represented by an NDArray is stored outside of the Java Virtual Machine. This design has the following benefits:
  • Avoid taxing the Garbage Collector
  • Interoperability with high-performance BLAS libraries such as OpenBlas
  • Allow number of array elements exceeds Int.MaxValue
The BLAS (Basic Linear Algebra Subprograms) are functions performing basic vector and matrix operations. The library is divided in 3 levels:
  • Level 1 BLAS perform scalar, vector and vector-vector operations,
  • Level 2 BLAS perform matrix-vector operations
  • Level 3 BLAS perform matrix-matrix operations. 
OpenBLAS is an optimized Basic Linear Algebra Subprograms (BLAS) library based on GotoBLAS2,  a C-library of linear algebra supporting a large variety of micro-processor. Its usage is governed by the BSD license.
LAPACK are Fortran routines for solving systems of simultaneous linear equations, least-squares solutions of linear systems of equations, eigenvalue problems, and singular value problems and matrix factorizations.

Implicit ND4j array conversion
The first step is to create a implicit conversation between ND4j and Scala data types.  
The following code convert an array of double into a INDArray using org.nd4j.linalg.factory.Nd4j java class and its constructor create(double[] values, int[] shape)
  • In case of a vector, the shape is defined in scala as  Array[Int](size_vector)
  • In case of a matrix, the shape is defined as Array[Int](numRows, numCols). 
The following snippet implement a very simple conversion from a Scala array to a INDArray

1
2
3
4
5
6
@throws(clazz = classOf[IllegalArgumentException])
implicit def double2NDArray(values: Array[Double]): INDArray = {
  require(values.nonEmpty, "ERROR: ND4, conversion ...")

  Nd4j.create(values, Array[Int](1, values.length))
}

Multivariate Normal distribution implementation
The sampling of a multivariate normal distribution is defined by the formula 
\[\mathbb{N}(\mu, \Sigma )=\frac{1}{\sqrt{(2\pi)^n \left | \Sigma \right |}} e^{\frac{1}{2}(x-\mu)^T {\Sigma '}^{-1}(x-\mu)}\] A generalized definition adds a very small random perturbation factor r |r| <= 1 on the variance value (diagonal of the covariance matrix) \[\Sigma ' = \Sigma + r.I\] Sigma is the covariance matrix and the mu is the mean value. 
The computation of the multivariate normal sampling can be approximated by the Cholesky decomposition. In a nutshell, the Cholesky algorithm decompose a positive-definite matrix into a product of two matrix
  • lower triangle matrix
  • transposition of its conjugate
It serves the same purpose as the ubiquitous LU decomposition with less computation. \[\mathbb{N}(\mu, \Sigma) \sim \mu + Chol(\Sigma).Z^T\] where \[L=Chol(\Sigma)\] and \[L.L^T=\Sigma\]. The vector Z is a multivariate normal noise \[Z= \{ z_i | z_i=N(0, 1)\}\]
The following implementation relies on the direct invocation of LAPACK library function potrf. The LAPACK functionality are accessed through the BLAS wrapper getBlasWrapper.

1
2
3
4
5
 final def choleskyDecomposition(matrix: INDArray): INDArray = {
   val matrixDup = matrix.dup
   Nd4j.getBlasWrapper.lapack.potrf(matrixDup, false)
   matrixDup
 }

Note that the matrix is duplicated prior to the LAPACK function call as we do not want to alter the input matrix. 
Let's implement the multivariate Normal sampler with perturbation using the Cholesky decomposition.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@throws(clazz = classOf[IllegalArgumentException])
@throws(clazz = classOf[IllegalStateException])
final def multiVariateNormalSampling(
   mean: INDArray,
   cov: INDArray,
   perturbation: Double = 0.0): INDArray = {
 import scala.util.Random
 require(cov.size(0) == mean.length, s"Sigma shape ${cov.size(0)} should be mean size ${mean.length}")
 require(cov.size(1) == mean.length, s"Sigma shape ${cov.size(1)} should be ${mean.length}")

 try {
  // Add a perturbation epsilon value to the covariance matrix
  // if it is defined
  val perturbMatrix =
   if(perturbation > 0.0)
    cov.add( squareIdentityMatrix(cov.size(0), 
             perturbation*(Random.nextDouble-0.5)))
   else
    cov

  // Apply the Cholesky decomposition
  val perturbed: INDArray = choleskyDecomposition(perturbMatrix)
   // Instantiate a normal distribution
  val normalPdf = new NormalDistribution(
       new DefaultRandom, 
       0.0, 
       1.0)

  val normalDensity = Array.fill(mean.size(0))(normalPdf.sample)
  val normalNDArray: INDArray = normalDensity

   // This is an implementation of the Dot product
  val normalCov = perturbed.mmul(normalNDArray.transpose)
   // Add the normalized perturbed covariance to the mean value
  mean.add(normalCov)
 }
 catch {
  case e: org.nd4j.linalg.api.blas.BlasException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: org.apache.commons.math3.exception.NotStrictlyPositiveException =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
  case e: Throwable =>
   throw new IllegalStateException(s"multiVariateNormalSampling: ${e.toString}")
 }
}

Let's look at the full implementation of the multi-variate normal sampling.
The first step validates the shape of the mean and covariance input matrices [line 8, 9]. As mentioned earlier, the generalized normal distribution introduces an optional random perturbation of small magnitude (~1e-4) [line 14-17] that is useful for application that requires some stochastic
The 'perturbed' covariance matrix is factorized using the Cholesky decomposition [line 22]. The normal probability density function (mean 0.0 and standard deviation 1.0) is used to generate random values [line 24-30] which is then applied to the covariance matrix [line 33].
The normal randomized variance is added to the vector of mean values [line 35].

For the sake of convenience, the multivariate normal density function uses the Apache Math common 3.5 API [line 24].

Environment
Scala 2.13.2
JDK 1.8

References
Scala 2.13
ND4j
Deep learning 4j
Cholesky decomposition
Approximation multivariate Normal sampling
Apache common math

Wednesday, November 21, 2018

Integration of 3rd party service with Spark

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external (3rd party) remote services such as NLP processing, deep learning model training or images classification. Moreover, these services, accessible through a REST API for instance may not be easily re-configurable.
A remote service is usually deployed as clusters of nodes (or instances) to improve scalability and guarantee high availability. The question becomes, what is the most efficient approach to generate and process requests to a cluster of services?



This post evaluates the performance of the execution of a remote service deployed on AWS with multiple nodes as part of a Apache Spark application. We compare two approaches to integrate Spark workers to the 3rd party service nodes using 
  • a load balancer
  • Hash partitioning the IP of the service nodes (one or more service nodes are assigned to a given partition)

Load balancer
Load balancers are commonly used for routing request to web services according to a policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of web servers.
A load balancer is a very simple and easily deployable solution that is self contained and  does not involve any architecture or coding changes.


In a typical Apache Spark deployment, the context splits data sets into partitions. Each  partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer



The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balance
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning (mapping) one or two nodes in the  cluster nodes to each partition.


Partitioning service nodes
The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.



The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
    val hashedCode = input.hashCode + currentTimeMillis
    val seed = (if (hashedCode < 0) -hashedCode 
         else hashedCode)
    val random = new scala.util.Random(seed)
    val serverHash = random.nextInt(serverAddresses.length)
    val targetIp = serverAddresses(serverHash)

    val url = s"http://${targetIp}:80/endpoint"
    val httpConnector = HttpConnector(url, timeout)
     // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
 notes: Dataset[Input],
 timeout: Int,
 serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
 inputs.map(
  input => 
     if (serverAddresses.nonEmpty) 
         hashedEndPoints(input, timeout, serviceAddresses)
     else
         throw new IlegalStateException("error ...")
   )
}

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.

Performance evaluation
Let's evaluate the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of worker notes. The following chart plots the average latency for a given NLP engine to process documents, with a variable number of nodes.



Hash partitioning does improve performance significantly in the case of a small number of nodes. However, it out-performs the traditional load balancing deployment by as much as 20% for larger number of nodes (or instances).

Environment
Scala 2.11.8
JDK 1.8
Apache Spark 2.3.2
AWS EMR 5.19

References
Introduction to Load Balancing
Spark Partitions and Partitioning

Saturday, July 7, 2018

Covariant and Contravariant Functors in Scala

Scala is a first class functional programming language which supports among other FP concepts, higher-kind types, functors and monads. This post illustrates the capability of Scale to leverage covariant and contravariant functors for tensor analysis, and implementing vectors and co-vectors.

Overview

Most of Scala developers have some experience with the core tenets of functional programming: monads, functors and applicatives. Those concepts are not specific to Scala or even functional programming at large. There are elements of a field in Mathematics known as topology or algebraic topology.

Differential geometry or differential topology makes heavy use of tensors that leverage covariant and contravariant functors.
This post introduces the concepts of

  • Contravariant functors applied to co-vectors and differential forms
  • Projection of higher kind

Vector fields 101
Let's consider a 3 dimension Euclidean space with basis vector {ei} and a vector field V (f1, f2, f3) [Note: we follow Einstein tensor indices convention]

The vector field at the point P(x,y,z) as the tuple (f1(x,y,z), f2(x,y,z), f3(x,y,z)).
The vector over a field of k dimension field can be formally. mathematically defined as
\[f: \boldsymbol{x} \,\, \epsilon \,\,\, \mathbb{R}^{k} \mapsto \mathbb{R} \\ f(\mathbf{x})=\sum_{i=1}^{n}{f^{i}}(\mathbf{x}).\mathbf{e}^{i}\] Example: \[f(x,y,z) = 2x+z^{3}\boldsymbol{\mathbf{\overrightarrow{i}}} + xy+e^{-y}-z^{2}\boldsymbol{\mathbf{\overrightarrow{j}}} + \frac{x^{3}}{y}\boldsymbol{\mathbf{\overrightarrow{k}}}\]
Now, let's consider the same vector V with a second reference (origin O' and basis vector e'i

\[f(\mathbf{x})=\sum_{i=1}^{n}{f'_{i}}(\mathbf{x}).\mathbf{e'}_{i}\]
The transformation matrix Sij convert the coordinates value functions fi and f'i. The tuple f =(fi) or more accurately defined as (fi) is the co-vector field for the vector field V
\[S_{ij}: \begin{Vmatrix} f^{1} \\ f^{2} \\ f^{3} \end{Vmatrix} \mapsto \begin{Vmatrix} f'^{1} \\ f'^{2} \\ f'^{3} \end{Vmatrix}\]
The scalar product of the co-vector f' and vector v(f) defined as is defined as
\[< f',v> = \sum f'_{i}.f^{i}\]
Given the scalar product we can define the co-vector field f' as a linear map
\[\alpha (v) = < f',v> (1) \]
Covariant functors

I assume the reader has basic understanding of Functor and Monads. Here is short overview:

A category C is composed of object x and morphism f defined as
\[C= \{ {x_{i}, f \in C | f: x_{i} \mapsto x_{j}} \}\] A functor F is a map between two categories C and D that preserves the mapping.
\[x\in C \Rightarrow F(x)\in D \\ x, y\in C \,\,\, F: x \mapsto y => F(x)\mapsto F(y)\]
Let's look at the definition of a functor in Scala with the "preserving" mapping method, map

1
2
3
trait Functor[M[_]] {
  def map[U, V](m: M[U])(f: U => V): M[V]
}

Let's define the functor for a vector (or tensor) field. A vector field is defined as a sequence or list of fields (i.e. values or function values).

type VField[U] = List[U]

trait VField_Ftor extends Functor[VField] {
  override def map[U, V](vu: VField[U])(f: U => V): VField[V] = vu.map(f) 
}

This particular implementation relies on the fact that List is a category with its own functor. The next step is to define the implicit class conversion VField[U] => Functor[VField[U]] so the map method is automatically invoked for each VField instance.


implicit class vField2Functor[U](vu: VField[U]) 
    extends VField_Ftor {  
    
  final def map[V](f: U => V): VField[V] = 
     super.map(vu)(f) 
}

By default Covariant Functors (which preserve mapping) are known simply as Functors. Let's look at the case of Covector fields.

Contravariant functors

A Contravariant functor is a map between two categories that reverses the mapping of morphisms.
\[x, y\in C \,\,\, F: x \mapsto y => F(y)\mapsto F(x)\]


trait CoFunctor[M[_]] {
  def map[U, V](m: M[U])(f: V => U): M[V]
}

The map method of the Cofunctor implements the relation M[V->U] => M[U]->M[V]
Let's implement a co-vector field using a contravariant functor. The definition (1) describes a linear map between a vector V over a field X to the scalar product V*: V => T.
A morphism on the category V* consists of a morphism of V => T or V => _ where V is a vector field and T or _ is a scalar function value.


type CoField[V, T] = Function1[V, T]


The co-vector field type, CoField is parameterized on the vector field type V which is a input or function parameter. Therefore the functor has to be contravariant.

The higher kind type M[_] takes a single type as parameter (i.e. M[V]) but a co-vector field requires two types:
  • V: Vector field
  • T: The scalar function is that the result of the inner product <.>

Fortunately the contravariant functor CoField_Ftor associated with the co-vector needs to be parameterized only with the vector field V. The solution is to pre-defined (or 'fix') the scalar type T using a higher kind projector for the type L[V] => CoField[V, T]
T => ({type L[X] = CoField[X,T]})#L


trait CoField_Ftor[T] 
  extends CoFunctor[({type L[X] = CoField[X,T]})#L ] {

  override def map[U,V](
    vu: CoField[U,T]
  )(f: V => U): CoField[V,T] = 
   (v: V) => vu(f(v))
}

As you can see the morphism over the type V on the category CoField is defined as f: V => U instead of f: U => V. A kind parameterized on the return type (Function1) would require the 'default' (covariant) functor. Once again, we define an implicit class to convert a co-vector field, of type CoField to its functor, CoField2Ftor


implicit class CoField2Ftor[U,T](vu: CoField[U,T]) 
    extends CoField_Ftor[T] {
  
  final def map[V](f: V => U): CoField[V,T] = 
     super.map(vu)(f) 
}

Evaluation
Let's consider a field of function values FuncD of two dimension: v(x,y) = f1(x,y).i + f2(x,y.j. The Vector field VField is defined as a list of two function values.


type DVector = Array[Double]
type FuncD = Function1[DVector, Double]
type VFieldD = VField[FuncD]

The vector is computed by assigning a vector field to a specific point (P(1.5, 2.0). The functor is applied to the vector field, vField to generate a new vector field vField2


val f1: FuncD = new FuncD((x: DVector) => x(0)*x(1))
val f2: FuncD = new FuncD((x: DVector) => -2.0*x(1)*x(1))
  
val vfield: VFieldD = List[FuncD](f1, f2)
val value: DVector = Array[Double](1.5, 2.0)  
val vField2: VFieldD = vfield.map( _*(4.0))


A co-vector field, coField is computed as the sum of the fields (function values) (lines 1, 2). Next, we compute the product of co-vector field and vector field (scalar field product) (line 6). We simply apply the co-vector Cofield (linear map) to the vector field. Once defined, the morphism _morphism is used to generate a new co-vector field coField2 through the contravariant function CoField2Functor.map(line 10).

Finally a newProduction is generated by composing the original covariant field Cofield and the linear map coField2 (line 12).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val coField: CoField[VFieldD, FuncD] = 
    (vf: VFieldD) => vf(0) + vf(1)

val contraFtor: CoField2Functor[VFieldD, FuncD] = coField
  
val product = coField(vField)
val _morphism: VFieldD => VFieldD = 
   (vf: VFieldD) => vf.map( _*(3.0))

val coField2 = contraFtor.map( _morphism )
   
val newProduct: FuncD = coField2(coField)


Environment
Scala 2.11.8
JDK 1.8

References

  • Tensor Analysis on Manifolds - R. Bishop, S. Goldberg - Dover publication 1980
  • Differential Geometric Structures - W. Poor - Dover publications 2007
  • Functors and Natural Transformationsv- A. Tarlecki - Category Theory 2014

Wednesday, May 2, 2018

Multivariate Kernel Density Estimation in Spark

Kernel Density Estimation (KDE) is a very powerful, non-parametric method to extract a empirical continuous probability density function from a dataset. At its core the KDE is a smooth approximation of an histogram. 
For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1, org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or dataset, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ...


  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity()
    .setSample(sample)
    .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver.

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions.
We assume
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size.
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29).
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
      val seqObs = iter.toArray
      val scale = 0.5 * seqObs.size* log(2 * Pi)
      val validation = validation_brdcast.value

      val (densities, count) = seqObs.aggregate(
        (new Array[Double](validation.length), 0L) ) (
        {        // seqOp (U, T) => U
          case ((x, z), y) => {
            var i = 0
            while (i < validation.length) {   
               // Call the pdf function for the normal distribution
              x(i) += multiNorm(y, bandwidth, scale, validation(i))
              i += 1
            }
            (x, z + 1)// Update  count & validation values
          }
        },
        {         // combOp: (U, U) => U
          case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
            blas.daxpy(validation.length, 1.0, v, 1, u, 1)
            (u, z + t)
          }
        }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
  import math._
  type Obs = Array[Double]

  @throws(classOf[IllegalArgumentException])
  def multiNorm(
      means: Obs, 
      bandWidth: Double, 
      scale: Double, 
      x: Obs): Double = {
    require(x.length == means.length, 
        "Dimension of means and observations differs")

    exp(
      -scale - (0 until means.length).map(n => {
        val sx = (means(n) - x(n)) / bandWidth
        -0.5 * sx * sx
      }).sum
    )
  }
}

Driver
This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
   ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
   ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
   ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
   ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
   ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity()
    .setSample(sample)
    .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))

Environment
Scala: 2.11.8
Java JDK 1.8
Apache Spark 2.3.1
OpenBLAS 0.3.4

References