Target audience: Intermediate
Estimated reading time: 20'
Kernel Density Estimation (KDE) is a very powerful, non-parametric method to extract a empirical continuous probability density function from a dataset. At its core the KDE is a smooth approximation of an histogram.
For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.
For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.
This post addresses the limitations of the current implementation of KDE in Apache Spark
for the multi-variate features.
Note: This post requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
Background
Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
The Apache Spark ecosystems includes a machine learning library, MLlib.
The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1, org.apache.spark.mllib.stats.KernelDensity has two important limitations:
- It is a univariate estimation
- The estimation is performed on a sequence of observations, not an RDD or dataset, putting computation load on the Spark driver.
val sample = sparkSession.sparkContext.parallelize(data)
val kd = new KernelDensity()
.setSample(sample)
.setBandwidth(3.0)
val densities = kd.estimate(Array(-2.0, 5.0))
The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver.
Multivariate KDE
The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset.
This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions.
We assume
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29).
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).
We assume
- The reference to the current Spark session is implicit (line 1)
- The encoding of a row for serialization of the task is provided (line 1)
- TrainingDS training dataset (line 9)
- Validation validation set (line 10)
- bandwidth size of the Parzen window
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29).
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | final class KDE(implicit sparkSession: SparkSession, encoder: Encoder[Row]) { /** * Applied the trained KDE to a set of validation data * @param trainingDS Training data sets * @param validationRdd Validation data sets * @return Datasets of probability densities */ def estimate( trainingDS: Dataset[Obs], validationDS: Dataset[Obs], bandwidth: Double = 1.0): Dataset[Double] = { import math._, sparkSession.implicits._ val validation_brdcast = sparkSession.sparkContext .broadcast[Array[Obs]](validationDS.collect) trainingDS.mapPartitions((iter: Iterator[Obs]) => { val seqObs = iter.toArray val scale = 0.5 * seqObs.size* log(2 * Pi) val validation = validation_brdcast.value val (densities, count) = seqObs.aggregate( (new Array[Double](validation.length), 0L) ) ( { // seqOp (U, T) => U case ((x, z), y) => { var i = 0 while (i < validation.length) { // Call the pdf function for the normal distribution x(i) += multiNorm(y, bandwidth, scale, validation(i)) i += 1 } (x, z + 1)// Update count & validation values } }, { // combOp: (U, U) => U case ((u, z), (v, t)) => { // Combiner calls vectorization z <- a.x + y blas.daxpy(validation.length, 1.0, v, 1, u, 1) (u, z + t) } } ) val invCount: Double = 1.0 / count blas.dscal(validation.length, invCount, densities, 1) // Rescale the density using LINPACK z <- a.x densities.iterator }) } } |
The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | final object KDE { import math._ type Obs = Array[Double] @throws(classOf[IllegalArgumentException]) def multiNorm( means: Obs, bandWidth: Double, scale: Double, x: Obs): Double = { require(x.length == means.length, "Dimension of means and observations differs") exp( -scale - (0 until means.length).map(n => { val sx = (means(n) - x(n)) / bandWidth -0.5 * sx * sx }).sum ) } } |
Application
This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | implicit val sparkSession: SparkSession = confToSessionFromFile(LocalSparkConf) implicit val encoder = Encoders.kryo[Row] import sparkSession.implicits._ val trainingData = Seq[(String, Array[Double])]( ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)), ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1)) ).toDS val validationData = trainingData .map { case (key, values) => values.map(_ *2.0) } val kde = new KDE val result = kde.estimate(trainingData.map(_._2),validationData) println(s"result: ${result.collect.mkString(", ")}") sparkSession.close val data = Seq[Double](1.0, 5.6) val sample = sparkSession.sparkContext.parallelize(data) val kd = new KernelDensity() .setSample(sample) .setBandwidth(3.0) val densities = kd.estimate(Array(-2.0, 5.0)) |
Environment
Scala: 2.11.8
Java JDK 1.8
Apache Spark 2.3.1
OpenBLAS 0.3.4
Scala: 2.11.8
Java JDK 1.8
Apache Spark 2.3.1
OpenBLAS 0.3.4
ReplyDeleteYou shares a lot of useful information about technology. Thank you for sharing this with us and keep sharing more like this.
C++ Training in Chennai
C C++ Training in Chennai
core java training in chennai
javascript training institute in chennai
javascript training in chennai
core java training in chennai
core java training
Machine Learning Projects for Final Year machine learning projects for final year
DeleteDeep Learning Projects assist final year students with improving your applied Deep Learning skills rapidly while allowing you to investigate an intriguing point. Furthermore, you can include Deep Learning projects for final year into your portfolio, making it simpler to get a vocation, discover cool profession openings, and Deep Learning Projects for Final Year even arrange a more significant compensation.
Python Training in Chennai Project Centers in Chennai
ReplyDeleteتنظيف منازل بالدمام تنظيف منازل بالدمام
تنظيف منازل بالاحساء تنظيف منازل بالاحساء
تنظيف منازل بمكة تنظيف منازل بمكة
تنظيف منازل بجدة تنظيف منازل بجدة
تنظيف منازل بالمدينة المنورة تنظيف منازل بالمدينة المنورة
Thanks for the interesting blog that you have implemented here. Very helpful and innovative. Waiting for your next upcoming article.
ReplyDeleteJava Training in Chennai
Java Training Institute in Chennai
Java course in chennai
Java Training classes
Java Training
Java programming classes
core Java course
Thanks for the interesting blog that you have implemented here. Very helpful and innovative. Waiting for your next upcoming article.
ReplyDeleteJava Training in Chennai
Java Training Institute in Chennai
Java course in chennai
Java Training classes
Java Training
Java programming classes
core Java course
This is a very nice one and gives in-depth information. I am really happy with the quality and presentation of the article.
ReplyDeletePython Classes in Bangalore
Nice article with valuable information. Thanks for sharing.
ReplyDeletePython Online Training
Artificial Intelligence Online Training
Data Science Online Training
Machine Learning Online Training
AWS Online Training
UiPath Online Training
golden goose outlet
ReplyDeletebape hoodie
golden goose sneakers
jordan shoes
kyrie 5 spongebob
supreme t shirt
air jordans
golden goose sneakers
moncler jacket
moncler outlet