Saturday, November 29, 2014

Apache Spark/MLlib for K-means

This page illustrates the Apache Spark MLlib library with the plain-vanilla K-means clustering (unsupervised) algorithm.

Overview
Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Note: The code presented in this post uses Apache Spark version 1.3.1. There is no guarantee that the implementation of the K-means in this post will be compatible with future version of Apache Spark.

Apache Spark RDDs
The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions
An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.

Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 2 Gbytes allocated for data processing (RDDs) in memory only storage level and 512 Mbytes for the master process is defined by creating a spark configuration instance of type SparkConf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
 
val sparkConf = new SparkConf()
            .setMaster("local[8]")
            .setAppName("SparkKMeans")
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")
 
implicit val sc = new SparkContext(sparkConf))

Apache Spark MLlib
MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
  • Classification algorithms, including logistic regression, Na├»ve Bayes and support vector machines
  • Clustering limited to K-means in version 1.0
  • L1 & L1 Regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS
  • Linear algebra such as Singular Value Decomposition
  • Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool, sbt.


Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
  • K Number of clusters (line 4)
  • maxNumIters Maximum number of iterations for the minimizing the reconstruction error< (line 5)/li>
  • numRuns Number of runs or episode used for training the clusters (line 6)
  • caching Specify whether the resulting RDD has to be cached in memory (line 7)
  • xt The array of data points (type Array[Double]) (line 8)
  • sc Implicit Spark context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
 
class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {
   
 
  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    kmeans.setK(K)
    kmeans.setMaxIterations(maxNumIters)
    kmeans.setRuns(numRuns)
   
    val rdd = sc.parallelize(xt.map(new DenseVector(_)))
    rdd.persist(StorageLevel.MEMORY_ONLY)
    if( caching )
       rdd.cache
    kmeans.run(rdd)
  }
}

The clustering model is created by the train method (line 11). Once the Spark/MLlib K-means is instantiated and initialized (lined 12 -15), the ipnt data set xt is converted into a DenseVector then converted into a RDD (line 17). Finally the input RDD is fed to the Kmeans (kmeans.run)

References

Thursday, October 30, 2014

Scala high order methods: collect & partition

This post describes the use cases and typical implementation of the Scala collect and partition higher order methods.

Overview
The Scala higher order methods collect, collectFirst and partition are not commonly used, even though these collection methods provide developers with a higher degree of flexibility than any combination of map, find and filter.

TraversableLike.collectFirst
The method create a new collection by applying a partial function to all elements of this traversable collection, such as arrays, list or map on which the function is defined. It signature is
def collect[B](pf: PartialFunction[A, B]): Traversable[B]
The use case is to validate K set (or samples) of data from a dataset. Once validated, these K sets are used in K-fold validation of a model generated through training of an machine learning algorithm: K-1 sets are used for training and the last set is used for validation. The validation consists of extracting K samples arrays from a generic array then test that each of these samples are not too noisy (standard deviation does not exceed a high threshold.
. The first step is to create the two generic functions of the validation: breaking the dataset into K sets, then compute the standard deviation of each set. This feat is accomplished by the ValidateSample trait

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
val sqr = (x : Double) => x*x

trait ValidateSample {
  type DVector = Array[Double]

    // Split a vector into sub vectors
  def split(xt: DVector, nSegments: Int): Iterator[DVector] =  
    xt.grouped(((xt.size/nSegments).ceil).toInt)
 
  lazy val stdDev = (xt: DVector) => {
    val mu = xt.sum/xt.size
    val var =(xt.map(_ - mu)
              .map(sqr(_))
              .reduce( _ + _))/(xt.size-1)
    Math.sqrt(var)
  }
 
  def isValid(x: DVector, nSegments: Int): Boolean
}


The first method, split breaks down the initial array x into an indexed sequence of segments or sub-arrays. The standard deviation stdDev is computed by folding the sum of values and sum of squared values. The value is defined as lazy so it is computed on demand once for all. The first validation class ValidateSampleMap uses a sequence of map and find to test that all the data segments extracted from the dataset have a standard deviation less than 0.8

class ValidateWithMap extends ValidateSample {
  override def isValid(x: DVector, nSegs: Int): Boolean =
    split(x, nSegs).map( stdDev(_) ).find( _ > 0.8) == None
}

The second implementation of the validation ValidateSampleCollect uses the higher order function collectFirst to test that all the data segments (validation folds) are not very noisy. collectFirst requires a PartialFunction to be defined with a condition of the standard deviation.

class ValidateWithCollect extends ValidateSample {
  override def isValid(x: DVector, nSegs: Int): Boolean =
    split(x, nSegs).collectFirst { 
        case xt: DVector => (stdDev(xt) > 0.8) } == None
    }
}

There are two main differences between the first implementation combining map and find and collectFirst implementation
  • The second version requires a single higher order function, collectFirst , while the first version uses map and find.
  • The second version throws a MatchErr exception as soon as a data segment does not comply
These two implementations can be evaluated using a simple driver application that takes a ValidateSample as argument.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
  
Try ( 
  new ValidateWithMap(0.8).isValid(rValues, 2) 
).getOrElse( false)
 

Try ( 
  new ValidateWithCollect(0.8).isValid(rValues, 2) 
) match {
  case Success(seq) => {}
  case Failure(e) => e match {
    case ex: MatchError => {}
    case _ => {}
  }
}


TraversableLike.collect
The method collect behavior similar to collectFirst. As collectFirst is a "partial function" version of "find", then collect is the "partial function" version of "filter".

def filter1(x: DVector, nSegments: Int): Iterator[DVector] = 
  split(x, nSegments).collect(pf)
  
def filter2(x: DVector, nSegments: Int): Iterator[DVector] = 
  split(x, nSegments).filter( stdDev( _ ) > ratio)


TraversableLike.partition
The Higher order method partition is used to partition or segment a mutable indexed sequence of object into a two indexed sequences given a boolean condition (or predicate).
def partition(p: (A) ⇒ Boolean): (Repr, Repr)
The test case consists of segmenting an array of random values, along the mean value 0.5 then compare the size of the two data segments. The data segments, segs should have similar size.

final val NUM_VALUES = 10000
val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
 
val segs = rValues.partition( _ >= 0.5)
val ratio = segs._1.size.toDouble/segs._2.size
println(s"Relative size of segments $ratio")

The test is executed with different size of arrays.:
NUM_VALUES  ratio
   50      0.9371
 1000      1.0041
10000      1.0002
As expected the difference between the two data segments size converges toward zero as the size of the original data set increases (law of large numbers).

References
Scala By Example - M. Odersky - June 2014

Friday, October 10, 2014

Scala Streams: A Gentle Introduction

How can we leverage Scala Streams to manage very large data sets with limited computing resources?

Overview
A Stream instance can be regarded as lazy list, or more accurately a list with lazy elements. The elements are allocated only when accessed. Stream allows Scala developers to write infinite sequences. Elements can be removed from memory (to be handled by the GC)defined) by eliminating any reference to its elements once no longer needed.

Performance Evaluation
It is easy to understand the benefit of Stream in term of memory management. But what about the performance?
Let's compare Stream and List using 3 simple operations:
  • Allocating elements
  • Retrieving a random element
  • Traversing the entire collection
Let's start by defining these operations in the context of computing the mean value of a very large sequence of double values.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val NUM = 10000000 
 
   // Allocation test
val lst = List.tabulate(NUM)( _.toDouble)

   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = lst(Random.nextInt(NUM-1)}
)
   // Reducer test
lst.reduce( _ + _ )/lst.size

The operation of reading a value at a random index is repeated 10,000 times in order to make the performance evaluation more reliable (line 8, 9). The mean is computed using a simple reduce method (line 12)

Let's implement the same sequence of operations using Stream class.

1
2
3
4
5
6
7
8
val strm = Stream.tabulate(NUM)( _.toDouble)
   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = strm(Random.nextInt(NUM-1)}
)
   // Reducer test
strm.reduceRight( _ + _ )/strm.size

The implementation of the generation of random values using Stream is very similar to the implementation using List (line 4, 5). The mean of the stream is also computed with a reducer (line 8).

The test is run 20 times to avoid distortion of the initialization of the JVM. 



The allocation of the elements in the stream is slightly faster than the creation of the list.
The main difference is the time required by the List and Stream to traverse the entire collection using the reduceRight method as a reducer. In this code snippet above, the Stream has to allocate all its elements at once. This scenario is very unlikely as Streams are usually needed to process section or slices of a very large sequence of values or objects, as demonstrated in the next section.

Use case: moving average
The most common application of Scala Stream is iterative or recursive application of a function/transform or sequence of functions to a very large data set, in this case, the mean value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val strm = Stream.fill(NUM)( Random.nextDouble )
  
val STEP = 5
val sum = strm.take(STEP).sum
val avStrm = strm.drop(STEP)

 // Apply the updating formula 
 // Sum(n, n+STEP) = Sum(n -1, STEP) - x(n-1) + x(n)
avStrm.zip(avStrm.tail)
      .map(x => sum - x._1 + x._2)
      .map( _ /STEP)

First, the code creates a reference strm of a stream of NUM random values (line 1). Then it computes the sum of the first STEP elements of the stream (line 4). Once the sum is computed, these elements are dropped from the stream (line 5). The mean value is updated for each new batch of new STEP elements (line 9-11).

Here is an alternative implementation of the computation of the moving average on a stream of floating point values using the tail recursion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def average(strm: Stream[Double], window: Int): Stream[Double] = {
  
  @scala.annotation.tailrec
  def average(
    src: Stream[Double], 
    target: Stream[Double]): Unit = {
    
    if( !src.isEmpty ) {
      val tailSrc = src.tail
      val newSum = sum - src.head + tailSrc.head
       average(strm.tail, target :+ newSum)
    }
  }
   
  val _strm = Stream.empty[Double] :+ strm.take(window).sum
  average(strm.drop(window), _strm)
  _strm.map( _/ window) 
}

The recursive call average (line 4) has two arguments: the stream src traversed through the recursion (line 5), and the stream that collects the average (mean) values (line 6). The method recurses as long as the source stream src is not empty (line 8).

The performance of the computation of the mean can be greatly improved by parallel its execution, Stream.par

References
Streams in Scala: Part 1 Scott Shipp 2014
Streams in Scala: Part 2 Scott Shipp 2014
Scala for Machine Learning Presentation P. Nicolas 2014

Sunday, September 14, 2014

Mixin Constraint on Self-typed Methods

This post illustrates the appropriate use of self-type to restrict the composition of (stackable) traits (mixins) in relation to an existing class or trait.

Overview
Mixin traits with self-type restriction has commonly used in Scala. Dependency injection and the Cake pattern in particular, relies on constraint imposed by a trait that it can be used only with subclass of a predefined types. The same approach can be used to constraint a trait to be used with class that support one or several methods.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Mixin constraint on self-type
Let's consider the case of a class taxonomy (or hierarchy) that classifies machine learning algorithms as either supervised learning or unsupervised learning.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
trait Learning {
  def predict(x: Double): Double { }
}
 
trait SupervisedLearning {
  def train(x: Array[Double]): Int = { ... }
}
 
trait Validation {
  self: SupervisedLearning => 
    def validate(x: Array[Double]): Double 
}
 
class SVM 
  extends SupervisedLearning 
    with Validation {

  override def train(x: Array[Double]): Int = { ... }
}

The support vector machine of type SVM is a type of supervised learning algorithm, and therefore extends theSupervisedLearning trait (line 5 & 115). The code snippet compiles because the class SVM (line 14) complies with the restriction imposed by the trait Validation on sub-types of SupervisedLearning (line 10).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
trait UnsupervisedLearning {
  def group(x: Array[Double]): int
}
 
    // Validation: failed self-typed inheritance 
    // from SupervisedLearning trait
class EM 
  extends UnupervisedLearning 
    with Validation { 
  
  override def train(x: Array[Double]): Int = { ... }
}

The Scala code snippet does not compile because the expectation-maximization algorithm, EM is an unsupervised learning algorithm and therefore is not a sub-class of SupervisedLearning.

Mixin constraint on self-typed method
Marking a trait to be extended (implemented) with sub-class with predefined method(s) is the same as marking a trait to be extended (implemented) with sub-class with predefined type.
Let's reuse the class hierarchy introduced in the previous section.

trait Validation { 
  self: { def train(x: Array[Double]): Int } =>
     def validate(x: Array[Double]): Double = -1.0
}

This time around the Validation can be mixin with a class that implements the method train
As previously seen, the class SVM complies with the restriction imposed by the Validation. However the declaration of the reinforcement learning algorithm QLearning generated a compilation error because it does not implement the method train asd required.

// No training needed for Q-Learning
class QLearning 
  extends Learning 
     with Validation{ 
   
  def predict(x: Double): Double { } 
}

Although brief, this introduction to self-referential condition should help you to start considering this technique to protect you code from unintentional erroneous sub-typing and trait mixing.


References
Scala Cookbook A. Alexander O' Reilly 2013
The Scala Programming Language - M. Odersky, L. Spoon, B.Venners - Artima 2007

Friday, August 22, 2014

Akka Master-Slave Design

This page describes and implements the Master-Slave design for distributed computing using Akka actors.

Overview
Traditional multi-threaded applications rely on accessing data located in shared memory. The mechanism relies on synchronization monitors such as locks, mutexes or semaphores to avoid deadlocks and inconsistent mutable states. Those applications are difficult to debug because of race condition and incur the cost of a large number of context switches.
The Actor model addresses those issues by using immutable data structures (messages) and asynchronous (non-blocking) communication. The actor model has already been described in the previous post "Scala Share-nothing Actors". This post focuses on the simple Master-worker model using Akka framework 2.3.4

Master-slave Model
In this design, the "slave" or "worker" actors are initialized and managed by the "master" actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks (or steps) executing the algorithm is performed through message passing:
* Activate from master to workers to launch the execution of distributed tasks
* Complete from workers to master to notify completion of tasks and return results.
* Terminate from master to terminate the worker actors.

The first step is to defined the immutable messages.

sealed abstract class Message(val id: Int)

case class Activate(i: Int, xt: Array[Double]) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Start(i: Int =0) extends Message(i)

The Start message is sent to the master by the client code, (external to the master-worker communication) to launch the computation.
The following sequence diagram illustrates the management of worker' tasks by the master actor through immutable, asynchronous messages.


The next step is to define the key attributes of the master. The constructor takes 4 arguments:
* A time series xt (line 5)
* A transformation function fct (line 6)
* A data partitioner (line 7)
* A method to aggregate the results from all the worker actors aggr (line 8)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type DblSeries = Array[Array[Double]]
type DblVector = Array[Double]
 
abstract class Master(
  xt: DblSeries, 
  fct: DblSeries => DblSeries, 
  partitioner: Partitioner, 
  aggr: (List[DblSeries]) => immutable.Seq[DblVector]) extends Actor{
 
  val workers = List.tabulate(partitioner.numPartitions)(n => 
      context.actorOf(Props(new Worker(n, fct)), 
          name = s"${worker_ String.valueOf(n)}"))
   
  workers.foreach( context.watch ( _ ) )
  ...
}

The master actor creates list of worker actors, workers using the higher order method tabulate (line 10). The master registers the worker actor to be notified of their termination context.watch (line 14).

In the implementation of the event handler receive for the master below, the Start message triggers the partitioning of the original dataset through a split function (line 3).
Upon completion of their tasks, the workers emit a Completed message to the master (line 6). The master counts the number of workers which have completed their tasks. Once all the workers have completed their tasks with the condition aggregator.size >= partitioner.numPartitions-1, the master computes the aggregated value (line 8), aggr then stop all the workers through its context workers.foreach( context.stop(_) ) (line 9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive = {
    // Sent by client to master to initiate the computation
  case s: Start => split
 
    // Sent by workers on completion of their computation
  case msg: Completed => {
    if(aggregator.size >= partitioner.numPartitions-1) {
       val aggr = aggregate.take(MAX_NUM_DATAPOINTS).toArray
       workers.foreach( context.stop(_) )
    }
    aggregator.append(msg.xt)
  }
   
     // Sent by client to shutdown master and workers
  case Terminated(sender) => {
      // wait the current execution of workers completes
    if( aggregator.size >= partitioner.numPartitions-1) {
       context.stop(self)
       context.system.shutdown
    }
  }
}

The message Terminated (line 15) shuts down the master and the global context for all the actors, context.system.shutdown (lines 18 & 19).

The next step consists of defining the tasks for the worker actors. A worker actors is fully specified by its id and the data transformation fct (lines 2 & 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
final class Worker(
     id: Int, 
     fct: DblSeries => DblSeries) extends Actor {
   
  override def receive = {
     // Sent by master to start execution
    case msg: Activate => {
      val msgId = msg.id+id
      val output = fct(msg.xt)
      sender ! Completed(msgId, output)
  }
}

The event loop processes only one type of message, Activate, (line 7) which executes the data transformation fct (lines 8 & 9).

The last step is the implementation of the test application. Let's consider the case of the cancellation of noise on a very large dataset xt executed across multiple worker actors. The dedicated master actor of type NoiseRemover partitions the dataset using an instance of Partitioner distributed the cancellation algorithm cancelNoise to its worker (or slave) actors. The results aggregation function aggr has to be defined for this specific operation.

def cancelNoise(xt: DblSeries): DblSeries
 
class NoiseRemover(
    xt: DblSeries,
    partitioner: Partitioner,
    aggr: List[DblSeries] => immutable.Seq[DblVector])
 extends Master(xt, cancelNoise, partitioner, aggr)


The Akka actor context ActorSystem is initialized (line 1). The test driver implements a very simple results aggregation function aggregate passed as parameter of the noise remover master actor, controller (line 4). The reference to the controller is generated by the Akka actor factory method ActorSystem.actorOf (line 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val actorSystem = ActorSystem("System")
   
  // Specifies the aggregator used in the master
def aggregate(aggr: List[DblSeries]): Seq[DblVector] =
    aggr.transpose.map( _.sum).toSeq
 
  // Create the Akka master actor
val controller = actorSystem.actorOf(
   Props(new NoiseRemover(xt, partitioner, aggregate)), "Master"
)

controller ! Start(1)

Finally the execution is started with a "fire and forget" message Start (line 12)

References

Friday, August 8, 2014

Bloom Filter in Scala

A brief introduction to the Bloom filter and its implementation in Scala using a cryptographic digest.

Overview
Bloom filter became a popular probabilistic data structure to enable membership queries (object x belonging to set or category Y) a couple of years ago. The main benefit of Bloom filter is to reduce the requirement of large memory allocation by avoiding allocating objects in memory much like HashSet or Hash Table. The compact representation comes with a trade-off: although the filter does not allow false negatives it does not guarantee that there is no false positives. In other words, a query returns:
- very high probability that an object belong to a set
- an object does not belong to a set
A Bloom filter is quite often used as a front end to a deterministic algorithm

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Theory
Let's consider a set A = {a0,.. an-1} of n elements for which a query to determine membership is executed. The data structure consists of a bit vector V of m bits and k completely independent hash functions that are associated to a position in the bit vector. The assignment (or mapping) of hash functions to bits has to follow a uniform distribution. The diagram below illustrates the basic mechanism behind the Bloom filter. The set A is defined by the pair a1 and a2. The hash functions h1 and h2 map the elements to bit position (bit set to 1) in the bit vector. The element b has one of the position set to 0 and therefore does not belong to the set. The element c belongs to the set because its associated positions have bits set to 1

However, the algorithm does not prevent false positive. For instance, a bit may have been set to 1 during the insertion of previous elements and the query reports erroneously that the element belongs to the set.
The insertion of an elements depends on the h hash functions, therefore the time needed to add a new element is h (number of hash functions) and independent from size of the bit vector: asymptotic insertion time = O(h). However, the filter requires h bits for each element and is less effective that traditional bit array for small sets.
The probability of false positives decreases as the number n of inserted elements decreases and the size of the bitvector m, increases. The number of hash functions that minimizes the probability of false positives is defined by h = m.ln2/n.

Implementation in Scala
The implementation relies on the MessageDigest java library class to generated the unique hash values. Ancillary methods and condition on methods arguments are ommitted for sake of clarity.
The first step is to define the BloomFilter class and its attributes
  • length Number of entries in the filter (line 2)
  • numHashs Number of hash functions (line 3)
  • algorithm Hashing algorithm with SHA1 as default (line 4)
  • set Array of bytes for entries in the Bloom filter (line 6)
  • digest Digest used to generate hash values (line 7)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class BloomFilter(
  length: Int,
  numHashs: Int, 
  algorithm: String="SHA1") {
    
  val set = new Array[Byte](length)
  val digest = Try(MessageDigest.getInstance(algorithm))

  def add(elements: Array[Any]): Int {}
  final def contains(el: Any): Boolean = {}

  private def hash(value: Int): Int {}
  private def getSet(el: Any): Array[Int] = {}
}

The digest using the message digest of the java library java.security.MessageDigest.
The next step consists of defining the methods to add single generic element add(any: Any) line 8 and array of elements add(elements: Array[Any]) (line 2).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// add an array of elements to the filter
def add(elements: Array[Any]): Int = digest.map(_ => {
   elements.foreach( getSet(_).foreach(set(_) = 1) )
   elements.size
 }).getOrElse(-1)
 
@inline
def add(any: Any): Boolean = this.add(Array[Any](any))
 
final def contains(any: Any): Boolean =
   digest.map( _ => !getSet(el).exists(set(_) !=1))
       .getOrElse(false)

The method contains (line 10) evaluates whether an element is contained in the filter. The method returns
  • true if the filter very likely contains the element
  • false if the filter DOES NOT contain this element
The contains method relies on a accessing an element from the set using the recursive getSet method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def getSet(any: Any): Array[Int] = {
  val newSet = new Array[Int](numHashs)
  newSet.update(0, hash(any.hashCode))
  getSet(newSet, 1)
  newSet
}
 
@scala.annotation.tailrec
def getSet(values: Array[Int], index: Int): Unit =
  if( index < values.size) {
    values.update(index, hash(values(index-1)))
    getSet(values, index+1) // tail recursion
  }
}


Similarly to the add method, the getSet methods has two implementations
  • Generate a new set from any new element (line 1)
  • A recursive call to initialize the Bloom filter with an array if integers (line 9).
The hash method is the core of the Bloom filter: It consists of computing an index of an entry.

def hash(value: Int) : Int = digest.map(d => {
  d.reset
  d.update(value)
  Math.abs(new BigInteger(1, d.digest).intValue) % (set.size -1)
}).getOrElse(-1)

The instance of the MessageDigestclass, digest generates a hash value using either MD5 or SHA-1 algorithm. Tail recursion is used as an alternative to the iterative process to generate the set.

The next code snippet implements a very simple implicit conversion from Int to Array[Byte] conversion (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
object BloomFilter {
 val NUM_BYTES = 4
 val LAST_BYTE = NUM_BYTES -1
 
 implicit def int2Bytes(value: Int) : Array[Byte] =
    Array.tabulate(NUM_BYTES)(n => {
      val offset = (LAST_BYTE - n) << LAST_BYTE
      ((value >>> offset) & 0xFF).toByte
    })
}

The conversion relies on the manipulation of bits from a 32 bit Integer to 4 bytes (line 6 - 8). Alternatively, you may consider a conversion from a long value to a 8 byte array.

Usage
This simple test consists of checking if a couple of values are indeed contains in the set. The filter will definitively reject 22 and very likely accept 23. If the objective is to confirm that 23 belongs to the set, then a full-fledged hash table would have to be used.

val filter = new BloomFilter(100, 100, "SHA")
final val newValues = Array[Any](57, 97, 91, 23, 67,33)  
                                
filter.add(newValues)

println( filter.contains(22) )
println( filter.contains(23) )

Performance evaluation
Let's look at the behavior of the bloom filter under load. The test consists of adding 100,000,000 new random values then test if the filter contains a value (10,000) times. The test is run 10 times after a warm up of the JVM.

final val newValues = Array[Any](57, 97, 91, 23, 67,33)                                  
  // Measure average time to add a new data set
filter.add(Array.tabulate(size)(n => Random.nextInt(n + 1)))

  // Measure average time to test for a value.
filter.contains(newValues(Random.nextInt(newValues.size)))

The first performance test evaluates the average time required to insert a new element into a Bloom filter which size range from 100M to 1Billion entries.
The second test evaluates the average search/query time for bloom filters with same range of size.




As expected the average time to load a new set of values and check the filter contains a specific value is fairly constant.


References
Bloom filter Wikipedia
github.com/prnicolas
The Scala Programming Language - M. Odersky, L. Spoon, B.Venners - Artima 2007