Thursday, October 30, 2014

Scala high order methods: collect & partition

This post describes the use cases and typical implementation of the Scala collect and partition higher order methods.

Overview
The Scala higher order methods collect, collectFirst and partition are not commonly used, even though these collection methods provide developers with a higher degree of flexibility than any combination of map, find and filter.

TraversableLike.collectFirst
The method create a new collection by applying a partial function to all elements of this traversable collection, such as arrays, list or map on which the function is defined. It signature is
def collect[B](pf: PartialFunction[A, B]): Traversable[B]
The use case is to validate K set (or samples) of data from a dataset. Once validated, these K sets are used in K-fold validation of a model generated through training of an machine learning algorithm: K-1 sets are used for training and the last set is used for validation. The validation consists of extracting K samples arrays from a generic array then test that each of these samples are not too noisy (standard deviation does not exceed a high threshold.
. The first step is to create the two generic functions of the validation: breaking the dataset into K sets, then compute the standard deviation of each set. This feat is accomplished by the ValidateSample trait

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
val sqr = (x : Double) => x*x

trait ValidateSample {
  type DVector = Array[Double]

    // Split a vector into sub vectors
  def split(xt: DVector, nSegments: Int): Iterator[DVector] =  
    xt.grouped(((xt.size/nSegments).ceil).toInt)
 
  lazy val stdDev = (xt: DVector) => {
    val mu = xt.sum/xt.size
    val var =(xt.map(_ - mu)
              .map(sqr(_))
              .reduce( _ + _))/(xt.size-1)
    Math.sqrt(var)
  }
 
  def isValid(x: DVector, nSegments: Int): Boolean
}


The first method, split breaks down the initial array x into an indexed sequence of segments or sub-arrays. The standard deviation stdDev is computed by folding the sum of values and sum of squared values. The value is defined as lazy so it is computed on demand once for all. The first validation class ValidateSampleMap uses a sequence of map and find to test that all the data segments extracted from the dataset have a standard deviation less than 0.8

class ValidateWithMap extends ValidateSample {
  override def isValid(x: DVector, nSegs: Int): Boolean =
    split(x, nSegs).map( stdDev(_) ).find( _ > 0.8) == None
}

The second implementation of the validation ValidateSampleCollect uses the higher order function collectFirst to test that all the data segments (validation folds) are not very noisy. collectFirst requires a PartialFunction to be defined with a condition of the standard deviation.

class ValidateWithCollect extends ValidateSample {
  override def isValid(x: DVector, nSegs: Int): Boolean =
    split(x, nSegs).collectFirst { 
        case xt: DVector => (stdDev(xt) > 0.8) } == None
    }
}

There are two main differences between the first implementation combining map and find and collectFirst implementation
  • The second version requires a single higher order function, collectFirst , while the first version uses map and find.
  • The second version throws a MatchErr exception as soon as a data segment does not comply
These two implementations can be evaluated using a simple driver application that takes a ValidateSample as argument.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
  
Try ( 
  new ValidateWithMap(0.8).isValid(rValues, 2) 
).getOrElse( false)
 

Try ( 
  new ValidateWithCollect(0.8).isValid(rValues, 2) 
) match {
  case Success(seq) => {}
  case Failure(e) => e match {
    case ex: MatchError => {}
    case _ => {}
  }
}


TraversableLike.collect
The method collect behavior similar to collectFirst. As collectFirst is a "partial function" version of "find", then collect is the "partial function" version of "filter".

def filter1(x: DVector, nSegments: Int): Iterator[DVector] = 
  split(x, nSegments).collect(pf)
  
def filter2(x: DVector, nSegments: Int): Iterator[DVector] = 
  split(x, nSegments).filter( stdDev( _ ) > ratio)


TraversableLike.partition
The Higher order method partition is used to partition or segment a mutable indexed sequence of object into a two indexed sequences given a boolean condition (or predicate).
def partition(p: (A) ⇒ Boolean): (Repr, Repr)
The test case consists of segmenting an array of random values, along the mean value 0.5 then compare the size of the two data segments. The data segments, segs should have similar size.

final val NUM_VALUES = 10000
val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
 
val segs = rValues.partition( _ >= 0.5)
val ratio = segs._1.size.toDouble/segs._2.size
println(s"Relative size of segments $ratio")

The test is executed with different size of arrays.:
NUM_VALUES  ratio
   50      0.9371
 1000      1.0041
10000      1.0002
As expected the difference between the two data segments size converges toward zero as the size of the original data set increases (law of large numbers).

References
Scala By Example - M. Odersky - June 2014

Friday, October 10, 2014

Scala Streams: A Gentle Introduction

How can we leverage Scala Streams to manage very large data sets with limited computing resources?

Overview
A Stream instance can be regarded as lazy list, or more accurately a list with lazy elements. The elements are allocated only when accessed. Stream allows Scala developers to write infinite sequences. Elements can be removed from memory (to be handled by the GC)defined) by eliminating any reference to its elements once no longer needed.

Performance Evaluation
It is easy to understand the benefit of Stream in term of memory management. But what about the performance?
Let's compare Stream and List using 3 simple operations:
  • Allocating elements
  • Retrieving a random element
  • Traversing the entire collection
Let's start by defining these operations in the context of computing the mean value of a very large sequence of double values.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val NUM = 10000000 
 
   // Allocation test
val lst = List.tabulate(NUM)( _.toDouble)

   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = lst(Random.nextInt(NUM-1)}
)
   // Reducer test
lst.reduce( _ + _ )/lst.size

The operation of reading a value at a random index is repeated 10,000 times in order to make the performance evaluation more reliable (line 8, 9). The mean is computed using a simple reduce method (line 12)

Let's implement the same sequence of operations using Stream class.

1
2
3
4
5
6
7
8
val strm = Stream.tabulate(NUM)( _.toDouble)
   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = strm(Random.nextInt(NUM-1)}
)
   // Reducer test
strm.reduceRight( _ + _ )/strm.size

The implementation of the generation of random values using Stream is very similar to the implementation using List (line 4, 5). The mean of the stream is also computed with a reducer (line 8).

The test is run 20 times to avoid distortion of the initialization of the JVM. 



The allocation of the elements in the stream is slightly faster than the creation of the list.
The main difference is the time required by the List and Stream to traverse the entire collection using the reduceRight method as a reducer. In this code snippet above, the Stream has to allocate all its elements at once. This scenario is very unlikely as Streams are usually needed to process section or slices of a very large sequence of values or objects, as demonstrated in the next section.

Use case: moving average
The most common application of Scala Stream is iterative or recursive application of a function/transform or sequence of functions to a very large data set, in this case, the mean value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val strm = Stream.fill(NUM)( Random.nextDouble )
  
val STEP = 5
val sum = strm.take(STEP).sum
val avStrm = strm.drop(STEP)

 // Apply the updating formula 
 // Sum(n, n+STEP) = Sum(n -1, STEP) - x(n-1) + x(n)
avStrm.zip(avStrm.tail)
      .map(x => sum - x._1 + x._2)
      .map( _ /STEP)

First, the code creates a reference strm of a stream of NUM random values (line 1). Then it computes the sum of the first STEP elements of the stream (line 4). Once the sum is computed, these elements are dropped from the stream (line 5). The mean value is updated for each new batch of new STEP elements (line 9-11).

Here is an alternative implementation of the computation of the moving average on a stream of floating point values using the tail recursion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def average(strm: Stream[Double], window: Int): Stream[Double] = {
  
  @scala.annotation.tailrec
  def average(
    src: Stream[Double], 
    target: Stream[Double]): Unit = {
    
    if( !src.isEmpty ) {
      val tailSrc = src.tail
      val newSum = sum - src.head + tailSrc.head
       average(strm.tail, target :+ newSum)
    }
  }
   
  val _strm = Stream.empty[Double] :+ strm.take(window).sum
  average(strm.drop(window), _strm)
  _strm.map( _/ window) 
}

The recursive call average (line 4) has two arguments: the stream src traversed through the recursion (line 5), and the stream that collects the average (mean) values (line 6). The method recurses as long as the source stream src is not empty (line 8).

The performance of the computation of the mean can be greatly improved by parallel its execution, Stream.par

References
Streams in Scala: Part 1 Scott Shipp 2014
Streams in Scala: Part 2 Scott Shipp 2014
Scala for Machine Learning Presentation P. Nicolas 2014