## Friday, October 27, 2017

### Reinforcement learning in Scala

You may wonder how robots, autonomous systems or a software game player learn. The answer lies in a field of AI known as reinforcement learning. For example, a robot navigating a maze plans his next move according to its current location and previous moves. Teaching a robot all possible move according to the different location in the maze is not realistic, making any supervised learning technique inadequate. This article describes a very common reinforcement learning methodology, Q-learning and its implementation in Scala.

Overview
There are many different reinforcement learning techniques. One of the most commonly used method is searching the value function space using temporal difference method.
All known reinforcement learning methods share the same objective of solving the problem of finding the optimum sequential decision tasks. In a sequential decision task, an agent interacts with a dynamic system by selecting actions that affect the transition between states in order to optimize a given reward function.

At any given step i, the agent select an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:$s_{i+1}=V(s_{i})$
The learning agent infers the policy that maps the set of states {s} to the set of available actions {a}, using a value function  $V(s_{i})$ The policy is defined at $\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}$

Temporal Difference
The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as $V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})$
Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The 'action-value' function represents the expected value of action a on a state s and defined as $Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})$ where r is the reward value for the state.

On-policies vs. Off-policy
The Temporal Difference method relies on the estimate of the final reward to be computed for each state. There are two methods of the Temporal Difference algorithm:On-Policy and Off-Policy:
- On-Policy method learns the value of the policy used to make the decision. The value function is derived from the execution of actions using the same policy but based on history
- Off-Policy method learns potentially different policies. Therefore the estimate is computed using actions that have not been executed yet.
The most common formula for temporal difference approach is the Q-learning formula. It introduces the concept of discount rate to reduce the impact of the first few states on the optimization of the policy. It does not need a model of its environment. The exploitation of action-value approach consists of selecting the next state is by computing the action with the maximum reward. Conversely the exploration approach focus on the total anticipated reward.The update equation for the Q-Learning is $Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$ $Q(s_{i},a_{i}): \mathrm{expected\,value\,action\,a\,on\,state\,s}\,\,\eta : \mathrm{learning\,rate}\,\,\alpha : \mathrm{discount\,rate}$ . One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as$Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$
States and Actions
Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm. We allow the user to specify any variant of the learning formula, using local functions or closures.
Firstly, we have to define a state class, QLState (line 1) that contains a list of actions of type QLAction (line 3) that can be executed from this state. The only purpose of this class is to connect a list of action to a source state. The parameterized class argument property (line 4) is used to "attach" some extra characteristics to this state.

 1 2 3 4 5 6 7 8 class QLState[T]( val id: Int, val actions: List[QLAction[T]] = List.empty, property: T) { @inline def isGoal: Boolean = !actions.isEmpty } 

As described in the introduction, an action of class QLAction has a source state from and a destination state to(state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.

case class QLAction[T](from: Int, to: Int)


The state and action can be loaded, generated and managed by a directed graph or search space of type QLSpace. The search space contains the list of all the possible states available to the agent.
One or more of these states can be selected as goals. The algorithm does not restrict the agent to a single state. The process ends when one of the goal states is reached (OR logic). The algorithm does not support combined goals (AND logic).

Let's implement the basic components of the search space QLSpace. The class list all available states (line 2) and one or more final or goal states goalIds (line 3). Although you would expect that the search space contains a single final or goal state, it is not uncommon to have online training using more than one goal state.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class QLSpace[T]( states: Array[QLState[T]], goalIds: Array[Int]) { // Indexed map of states val statesMap: immutable.Map[Int, QLState[T]] = states.map(st => (st.id, st)).toMap // List set of one or more goals val goalStates = new immutable.HashSet[Int]() ++ goalIds // Compute the maximum Q value for a given state and policy def maxQ(st: QLState[T], policy: QLPolicy[T]): Double = { val best = states.filter( _ != st) .maxBy(_st => policy.EQ(st.id, _st.id)) policy.EQ(st.id, best.id) } // Retrieves the list of states destination of state, st def nextStates(st: QLState[T]): List[QLState[T]] = st.actions.map(ac => statesMap.get(ac.to).get) def init(r: Random): QLState[T] = states(r.nextInt(states.size-1)) } 

A hash map statesMap maintains a dictionary of all the possible states with the state id as unique key (lines 6, 7). The class QLSpace has three important methods:
• init initializes the search with a random state for each training epoch (lines 22, 23)
• nextStates returns the list of destination states associated to the state st (lines 19, 20)
• maxQ return the maximum Q-value for this state st given the current policy policy(lines 12-15). The method filters out itself from the search from the next best action. It then compute the maximum reward or Q(state, action) value according to the given policy policy

The next step is to defined a policy.

Learning Policy
A policy is defined by three components
• A reward collected after transitioning from one state to another state (line 2). The reward is provided by the user
• A Q(State, Action) value, value associated to a transition state and an action (line 4)
• A probability (with default values of 1.0) that defines the obstacles or hindrance to migrate from one state to another (line 3)
The estimate combine the Q-value (incentive to move to the best next step) and probability (hindrance to move to any particular state) (line 7).

 1 2 3 4 5 6 7 8 class QLData { var reward: Double = 1.0 var probability: Double = 1.0 var value: Double = 0.0) { @inline final def estimate: Double = value*probability } 

The policy of type QLPolicy is a container for the state transition attributes, rewards, Q-values and probabilities.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class QLPolicy[T](numStates: Int, input: Array[QLInput]) { val qlData = { val data = Array.tabulate(numStates)( _ => Array.fill(numStates)(new QLData) ) input.foreach(in => { data(in.from)(in.to).reward = in.reward data(in.from)(in.to).probability = in.prob }) data } def setQ(from: Int, to: Int, value: Double): Unit = qlData(from)(to).value = value def Q(from: Int, to: Int): Double = qlData(from)(to).value } 

The constructor for QLPolicy takes two arguments:
• Number of states numStates (line 1)
• Sequence of input of type QLInput to the policy
The constructor creates a numStates x numStates matrix of transition of type QLData (lines 3 - 12), from the input.

The type QLInput wraps the input data (index of the input state from, index of the output state to, reward and probability associated to the state transition) into a single convenient class.

case class QLInput(
from: Int,
to: Int,
reward: Double = 1.0,
prob: Double = 1.0)


Model and Training
The first step is to define a model for the reinforcement learning. A model is created during training and is composed of
• Best policy to transition from any initial state to a goal state
• Coverage ratio as defined as the percentage of training cyles that reach the (or one of the) goal.

class QLModel[T](val bestPolicy: QLPolicy[T], val coverage: Double)


The QLearning class takes 3 arguments
• A set of configuration parameters config
• The search/states space qlSpace
• The initial policy associated with the states (reward and probabilities) qlPolicy

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class QLearning[T]( config: QLConfig, qlSpace: QLSpace[T], qlPolicy: QLPolicy[T]) //model in Q-learning algorithm val model: Option[QLModel[T]] = train.toOption // Generate a model through multi-epoch training def train: Try[Option[QLModel[T]]] {} private def train(r: Random): Boolean {} // Predict a state as a destination of this current // state, given a model def predict : PartialFunction[QLState[T], QLState[T]] {} // Select next state and action index def nextState(st: (QLState[T], Int)): (QLState[T], Int) {} } 

The model of type Option[QLModel] (line 7) is created by the method train (line 10). Its value is None if training failed.

The training method train consists of executing config.numEpisodes cycle or episode of a sequence of state transition (line 5). The random generator r is used in the initialization of the search space.

  1 2 3 4 5 6 7 8 9 10 11 12 13 def train: Option[QLModel[T]] = { val r = new Random(System.currentTimeMillis) Try { val completions = Range(0, config.numEpisodes).filter(train(r) ) val coverage = completions.toSize.toDouble/config.numEpisodes if(coverage > config.minCoverage) new QLModel[T](qlPolicy, coverage) else QLModel.empty[T] }.toOption } 

The training process exits with the model if the minimum minCoverage (number of episodes for which the goal state is reached) is met (line 8).

The method train(r: scala.util.Random) uses a tail recursion to transition from the initial random state to one of the goal state. The tail recursion is implemented by the search method (line 4). The method implements the recursive temporal difference formula (lines 14-18).
The state for which the action generates the highest reward R given a policy qlPolicy (line 10) is computed for each new state transition. The Q-value of the current policy is then updated qlPolicy.setQ before repeating the process for the next state, through recursion (line 21).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def train(r: Random): Boolean = { @scala.annotation.tailrec def search(st: (QLState[T], Int)): (QLState[T], Int) = { val states = qlSpace.nextStates(st._1) if( states.isEmpty || st._2 >= config.episodeLength ) (st._1, -1) else { val state = states.maxBy(s => qlPolicy.R(st._1.id, s.id)) if( qlSpace.isGoal(state) ) (state, st._2) else { val r = qlPolicy.R(st._1.id, state.id) val q = qlPolicy.Q(st._1.id, state.id) // Q-Learning formula val deltaQ = r + config.gamma*qlSpace.maxQ(state, qlPolicy) -q val nq = q + config.alpha*deltaQ qlPolicy.setQ(st._1.id, state.id, nq) search((state, st._2+1)) } } } r.setSeed(System.currentTimeMillis*Random.nextInt) val finalState = search((qlSpace.init(r), 0)) if( finalState._2 != -1) qlSpace.isGoal(finalState._1) else false } 

Note: There is no guarantee that one of the goal state is reached from any initial state chosen randomly. It is expected that some of the training epoch fails. This is the reason why monitoring coverage is critical. Obviously, you may choose a deterministic approach to the initialization of each training epoch by picking up any state beside the goal state(s), as a starting state.

Prediction
Once trained, the model is used to predict the next state with the highest value (or probability) given an existing state. The prediction is implemented as a partial function.

 1 2 3 4 def predict : PartialFunction[QLState[T], QLState[T]] = { case state: QLState[T] if(model != None) => if( state.isGoal) state else nextState(state, 0)._1 } 

The method nextState does the heavy lifting. It retrieves the list of states associated with the current state st through its actions set (line 2). The next most rewarding state qState is computed using the reward matrix R of the best policy of the QLearning model (lines 6 - 8).

  1 2 3 4 5 6 7 8 9 10 11 12 13 def nextState(st: (QLState[T], Int)): (QLState[T], Int) = { val states = qlSpace.nextStates(st._1) if( states.isEmpty || st._2 >= config.episodeLength) st else { val qState = states.maxBy( s => model.map(_.bestPolicy.R(st._1.id, s.id)) .getOrElse(-1.0) ) nextState( (qState, st._2+1)) } } 

Conclusion
An article or blog spot can not realistically describe all the elements and strategies of reinforcement learning from K-armed bandits to deep learning. However, this chapter should provide you with a road map on how to implement a simple reinforcement learning algorithm in Scala.

## Monday, August 8, 2016

### Spark ML pipelines I - Features encoding

Apache spark introduced machine learning (ML) pipeline in version 1.4.0. A pipeline is actually a workflow or sequence of tasks that cleanse, filter, train, classify, predict and validate data set. Those tasks are defined as stage of the pipeline.
Spark 2.0 extends ML pipelines to support persistency, while the MLlib package is slowly deprecated in favor of the data frame and data set based ML library.
ML pipeline allows data scientist to weave transformers and estimators into a single monotonic classification and predictive models.

This first post related to ML pipeline implement a feature encoding pipeline

ML Pipelines 101
This section is a very brief overview of ML pipelines. Further information is available at
The key ingredient of a ML pipeline are

• Transformers are algorithms which can transform one DataFrame into another DataFrame. Transformers are stateless
• Estimators are algorithm which can be fit on a DataFrame to produce a Transformer (i.e. Estimator.fit)
• Pipelines are estimators that weave or chain multiple Transformers and Estimators together to specify an ML workflow
• Pipeline stages are the basic element of the ML workflow. Transformers, estimators and pipelines are pipeline stages
• Parameters encapsulate the tuning and configuration parameters requires for each Transformers and Estimators.
Note: The following implementation has been tested using Apache Spark 2.0

Features encoding pipeline
Categorical features have multiple values or instances which are represented as a string. Numerical value as categorized or bucketed through a conversion to a string.
Once converted to a string, the categorical values are converted into category indices using the StringIndex transformer. The resulting sequence of indices is ranked by decreasing order of frequency of the value in the training or validation set.
Next, the indices associated to particular features are encoded into a vector of binary value (0, 1) through the OneHotEncoder transformer. A feature instance is encoded as 1 if is defined in the data point, 0 otherwise.
Finally, the vector of binary values of all the feature are aggregated or assembled through the transformer VectorAssembler
Let's consider the following simple use case: A sales report list the date an item is sold, its id, the sales region and name of the sales person or agent.
   date         id         region    agent
---------------------------------------
07/10/2014   23c9a89d   17        aa4

The encoding pipeline is illustrated in the following diagram:

The data source is a CSV sales report file. Each column or feature is converted to a string index, then encoded as a vector of binary values. Finally, the 4 vectors are assembled into a single features vector.

Implementation
The first step is to create a Spark 2.x session. Let's wrap the spark session configuration into a single, reusable trait, SparkSessionManager

trait SparkSessionManager {
protected[this] val sparkSession = SparkSession.builder()
.appName("LR-Pipeline")
.config(new SparkConf()
.set("spark.default.parallelism", "4")
.set("spark.rdd.compress", "true")
.set("spark.executor.memory", "8g")
.set("spark.shuffle.spill", "true")
.set("spark.shuffle.spill.compress", "true")
.set("spark.io.compression.codec", "lzf")
.setMaster("local[4]")).getOrCreate()

protected def csv2DF(dataFile: String): DataFrame =
.option("inferSchema", true)
.csv(dataFile)

def stop: Unit = sparkSession.stop
}


The SparkSession is instantiated with the same configuration SparkConf as the SparkContext used in Spark 0.x and 1.x versions.
The method csv2DF loads the content of CSV file and generate a data frame or data set.

The next step consists of creating the encoding workflow or pipeline, wrapped into the DataEncoding

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 trait DataEncoding { protected[this] val colNames: Array[String] lazy val vecColNames = colNames.map(vector(_)) lazy val pipelineStages: Array[PipelineStage] = colNames.map(colName => new StringIndexer() .setInputCol(colName) .setOutputCol(index(colName))) ++ colNames.map(colName => new OneHotEncoder() .setInputCol(index(colName)) .setOutputCol(vector(colName))) ++ Array[PipelineStage](new VectorAssembler() .setInputCols(vecColNames).setOutputCol("features")) def index(colName: String): String = s"${colName}Index" def vector(colName: String): String = s"${colName}Vector" } 

The sequence of names of the columns (features) colNames is an abstract value that needs to be defined for the specific training set (line 2). The first two stages of the data encoding pipeline, String indexing (line 6-8) and encoding (line 9-11) are applied to each of the 4 features. The last stage, assembling the features vector (line 12, 13) is added to the array of the previous stages to complete the pipeline. Each stage is defined by its input column setInputCol and output column/feature setOutputCol

Let's leverage the Spark session and the data encoding to generate a classification model, ModelFactory. Classification and predictive models are built using ML pipelines as described in a future post. For now let's create a pipeline model using the data encoding stages

val dataEncoder = new DataEncoding {
override protected[this] val colNames: Array[String]=
Array[String]("date", "id", "region", "agent")

def pipelineModel(df: DataFrame): PipelineModel =
new Pipeline().setStages(pipelineStages).fit(df)
}



The pipeline model is generated in two steps:
• Instantiate the pipeline from the date encoding stages
• Generate the model from a input or training data frame, df by invoking the fit method.
The next post extends the data encoding pipeline with two estimators: a classifier and a cross validator.

References
Introduction to ML pipelines and MLlib
ML StringIndexer transformer
ML OneHotEncoder transformer
ML VectorAssember transformer

## Monday, July 4, 2016

### Monte Carlo Integration in Scala

This post introduces an overlooked numerical integration method leveraging the ubiquituous Monte Carlo simulation.
Not every function has a closed form for computing a definite integral known as symbolic integration. There are many numerical integration method for continuous functions such as Simpson's formula, Newton-Cotes quadrature rule and Gaussian quadrature. These methods are deterministic by nature.
The Monte-Carlo numerical integration is a stochastic method that relies on randomly generated values to estimate the area under a curve, surface or any multidimensional space. The illustration of the Monte Carlo integration method uses a pragmatic uniform random distribution of data points for single variable function.

Basic concept
Let's consider the single variable function illustrated in the following line plot.
$f(x)=\frac{1}{x}-0.5$

The objective is to compute the integral of the function over the interval [1, 3]. The Monte Carlo numerical integration is defined by three steps:
• Define the outer area of the function defined by the minimum and maximum values of x and y, in this case x [1, 3] and y [0.5, -0.12]
• Generate random data points between over the outer area
• Compute the ratio of the number of random data points within the function area over the total number of data points
Let's define a generic class, MonteCarloIntegrator to encapsulate these 3 steps:
The class has two arguments (line 1)
• The function f to sum
• The number of random data points used in the methodology

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class MonteCarloIntegrator(f: Double => Double, numPoints: Int) { def integrate(from: Double, to: Double): Double = { val (min, max) = getBounds(from, to) val width = to - from val height = if (min >= 0.0) max else max - min val outerArea = width * height val randomx = new Random(System.currentTimeMillis) val randomy = new Random(System.currentTimeMillis + 42L) def randomSquare(randomx: Random, randomy: Random): Double = { val numInsideArea = Range(0, numPoints)./:(0)( (s, n) => { val ptx = randomx.nextDouble * width + from val pty = randomy.nextDouble * height randomx.setSeed(randomy.nextLong) randomy.setSeed(randomx.nextLong) s + (if (pty > 0.0 && pty < f(ptx)) 1 else if (pty < 0.0 && pty > f(ptx)) -1 else 0) } ) numInsideArea.toDouble * outerArea / numPoints } randomSquare(randomx, randomy) } } 

The method integrate implements the sum of the function over the interval [from, to] (line 3). The first step is to extract the bounds getBounds of the outer area (line 4) which size is computed on line 7. Each coordinate is assigned a random generator randomx and randomy (lines 8 & 9).
The nested method randomSquare records the number of data points, numInsideArea that falls into the area delimited by the function (line 13 - 21). The sum is computed as the relative number of data points inside the area delimited by the function (line 24).

The method getBounds is described in the following code snippet. It is a simple, although not particularly efficient approach to extract the boundary of the integration. It breaks down the interval into of steps (lines 2 & 3) and collects the minimum and maximum values of the function (lines 7 - 12).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def getBounds(from: Double, to: Double): (Double, Double) = { val numSteps = Math.sqrt(numPoints).floor.toInt val stepSize = (to - from) / numSteps (0 to numSteps)./:((Double.MaxValue, -Double.MaxValue))( (minMax, n) => { val y = f(n * stepSize + from) updateBounds(y, minMax) match { case 0x01 => (y, minMax._2) case 0x02 => (minMax._1, y) case 0x03 => (y, y) case _ => minMax } } ) } def updateBounds(y: Double, minMax: (Double,Double)): Int = { var flag = 0x00 if (y < minMax._1) flag += 0x01 if (y > minMax._2) flag += 0x02 flag } 

Precision, precision
You may wonder about the accuracy of the Monte Carlo method and how many randomly generated data points are needed for a decent accuracy. Let's consider the same function $f(x)=\frac{1}{x}-0.5$ and its indefinite integral, that is used to generated the expected sum for the function f $\int f(x)dx=log(x)-\frac{1}{2x}+C$ The simple test consists of computing the error between the value produced by the definite integral and the sum from the Monte Carlo method as implemented in the following code snippet.

 1 2 3 4 5 6 7 8 val fct =(x: Double) => 2 * x - 1.0 val integral = (x: Double, c: Double) => x*x - x + c) final val numPoints=10000 val integrator = new MonteCarloIntegrator(fct, numPoints) val predicted = monteCarloIntegrator.integrate(1.0, 2.0) val expected = integral(2.0, 0.0) - integral(1.0, 0.0) 

We run the test 100 times for number of random data points varying between 100 and 50,000.

Monte Carlo is reasonably accurate even with a small number of data points. In this case, the small increase in accuracy does not justify the need to randomize a significant number of data points beyond 1000 data points.

A smarter approach would be to rely on a exit condition to complete the summation in the shortest time possible without having to estimate the optimum number of random data points. A new batch of numPoints rand data points is added at each iteration. One simple convergence criteria compares the difference of the sum between two consecutive iterations:
  sum(existing data points + new batch data points) - sum(existing data points) < eps

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 def randomSquare(randomx: Random, randomy: Random): Double = { var oldValue = 0.0 Range(0, numPoints).takeWhile(_ => { val numInsideArea = .... // ... s + .... }) val newValue = numInsideArea.toDouble * outerArea / numPoints val diff = Math.abs(newValue - oldValue) oldValue = newValue diff < eps }) oldValue } 

At each iteration, a new batch of numPoints data points is randomly generated to enhance the accuracy of the summation. The exit strategy is implemented through the higher order Scala collection method takeWhile (lines 3 & 11).

Note: This implementation of the Monte Carlo integration is simple enough to illustrate the concept of stochastic methods applied to calculus. The recursive stratified sampling has been shown to be more accurate for computing definite integral of function with significant inflection points (extreme second order derivative).

References
Monte Carlo Integration Dartmouth College - C. Robert, G. Casella
The Clever Machine: Monte Carlo ApproximationsDustin Stansbury
Programming in Scala - 3rd edition M. Odersky, L. Spoon, B. Venners