## Wednesday, December 18, 2013

### Reinforcement learning in Scala 2: Model

This page is the second installement of our introduction to reinforcement learning using the temporal difference. This section is dedicated to the ubiquitous Q-learning algorithm.

Overview
The previous post, Reinforcement learning in Scala: Policies introduced the concept of reinforcement learning, temporal difference and more specifically the Q-learning algorithm:
Reinforcement Learning I: States & Policy The following components were implemented:
• States QLState and their associated actions QLAction
• States space QLSpace
• Policy QLPolicy as a container of tuple {reward, Q-value, probabilities} of the Q-learning model
The last two components to complete the implementation of Q-learning are the model and the training algorithm.

Model and Training
The first step is to define a model for the reinforcement learning. A model is created during training and is composed of
• Best policy to transition from any initial state to a goal state
• Coverage ratio as defined as the percentage of training cyles that reach the (or one of the) goal.
class QLModel[T](val bestPolicy: QLPolicy[T], val coverage: Double)


The QLearning class takes 3 arguments
• A set of configuration parameters config
• The search/states space qlSpace
• The initial policy associated with the states (reward and probabilities) qlPolicy

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class QLearning[T]( config: QLConfig, qlSpace: QLSpace[T], qlPolicy: QLPolicy[T]) //model in Q-learning algorithm val model: Option[QLModel[T]] = train.toOption // Generate a model through multi-epoch training def train: Try[Option[QLModel[T]]] {} private def train(r: Random): Boolean {} // Predict a state as a destination of this current // state, given a model def predict : PartialFunction[QLState[T], QLState[T]] {} // Select next state and action index def nextState(st: (QLState[T], Int)): (QLState[T], Int) {} } 

The model of type Option[QLModel] (line 7) is created by the method train (line 10). Its value is None if training failed.

The training method train consists of executing config.numEpisodes cycle or episode of a sequence of state transition (line 5). The random generator r is used in the initialization of the search space.

  1 2 3 4 5 6 7 8 9 10 11 12 13 def train: Option[QLModel[T]] = { val r = new Random(System.currentTimeMillis) Try { val completions = Range(0, config.numEpisodes).filter(train(r) ) val coverage = completions.toSize.toDouble/config.numEpisodes if(coverage > config.minCoverage) new QLModel[T](qlPolicy, coverage) else QLModel.empty[T] }.toOption } 

The training process exits with the model if the minimum minCoverage (number of episodes for which the goal state is reached) is met (line 8).

The method train(r: scala.util.Random) uses a tail recursion to transition from the initial random state to one of the goal state. The tail recursion is implemented by the search method (line 4). The method implements the recursive temporal difference formula introduced in the previous post (Reinforcement Learning I: States & Policy) (lines 14-18).

The state for which the action generates the highest reward R given a policy qlPolicy (line 10) is computed for each new state transition. The Q-value of the current policy is then updated qlPolicy.setQ before repeating the process for the next state, through recursion (line 21).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def train(r: Random): Boolean = { @scala.annotation.tailrec def search(st: (QLState[T], Int)): (QLState[T], Int) = { val states = qlSpace.nextStates(st._1) if( states.isEmpty || st._2 >= config.episodeLength ) (st._1, -1) else { val state = states.maxBy(s => qlPolicy.R(st._1.id, s.id)) if( qlSpace.isGoal(state) ) (state, st._2) else { val r = qlPolicy.R(st._1.id, state.id) val q = qlPolicy.Q(st._1.id, state.id) // Q-Learning formula val deltaQ = r + config.gamma*qlSpace.maxQ(state, qlPolicy) -q val nq = q + config.alpha*deltaQ qlPolicy.setQ(st._1.id, state.id, nq) search((state, st._2+1)) } } } r.setSeed(System.currentTimeMillis*Random.nextInt) val finalState = search((qlSpace.init(r), 0)) if( finalState._2 != -1) qlSpace.isGoal(finalState._1) else false } 

Note: There is no guaranty that one of the goal state is reached from any initial state chosen randomly. It is expected that some of the training epoch fails. This is the reason why monitoring coverage is critical. Obviously, you may choose a deterministic approach to the initialization of each training epoch by picking up any state beside the goal state(s), as a starting state.

Prediction
Once trained, the model is used to predict the next state with the highest value (or probability) given an existing state. The prediction is implemented as a partial function.

 1 2 3 4 def predict : PartialFunction[QLState[T], QLState[T]] = { case state: QLState[T] if(model != None) => if( state.isGoal) state else nextState(state, 0)._1 } 

The method nextState does the heavy lifting. It retrieves the list of states associated with the current state st through its actions set (line 2). The next most rewarding state qState is computed using the reward matrix R of the best policy of the QLearning model (lines 6 - 8).

  1 2 3 4 5 6 7 8 9 10 11 12 13 def nextState(st: (QLState[T], Int)): (QLState[T], Int) = { val states = qlSpace.nextStates(st._1) if( states.isEmpty || st._2 >= config.episodeLength) st else { val qState = states.maxBy( s => model.map(_.bestPolicy.R(st._1.id, s.id)) .getOrElse(-1.0) ) nextState( (qState, st._2+1)) } } 

References

## Tuesday, December 10, 2013

### Reinforcement Learning in Scala 1: States & Policies

This post describes a very common reinforcement learning methodology: Tenporal difference update as implemented in Scala. This first section introduces and implements the concept of states and policies.

Overview
There are many different approaches to implement reinforcement learning
One of the most commonly used method is searching the value function space using temporal difference method
All known reinforcement learning methods share the same objective of solving the sequential decision tasks. In a sequential decision task, an agent interacts with a dynamic system by selecting actions that affect the transition between states in order to optimize a given reward function.

At any given step i, the agent select an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:$s_{i+1}=V(s_{i})$
The learning agent infers the policy that maps the set of states {s} to the set of available actions {a}, using a value function  $V(s_{i})$ The policy is defined at $\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}$

Temporal Difference
The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as $V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})$
Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The 'action-value' function represents the expected value of action a on a state s and defined as $Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})$ where r is the reward value for the state.

On-policy vs. Off-policy
The Temporal Difference method relies on the estimate of the final reward to be computed for each state. There are two methods of the Temporal Difference algorithm:On-Policy and Off-Policy:
- On-Policy method learns the value of the policy used to make the decision. The value function is derived from the execution of actions using the same policy but based on history
- Off-Policy method learns potentially different policies. Therefore the estimate is computed using actions that have not been executed yet.

The most common formula for temporal difference approach is the Q-learning formula. It introduces the concept of discount rate to reduce the impact of the first few states on the optimization of the policy. It does not need a model of its environment. The exploitation of action-value approach consists of selecting the next state is by computing the action with the maximum reward. Conversely the exploration approach focus on the total anticipated reward.The update equation for the Q-Learning is $Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$ $Q(s_{i},a_{i}): \mathrm{expected\,value\,action\,a\,on\,state\,s}\,\,\eta : \mathrm{learning\,rate}\,\,\alpha : \mathrm{discount\,rate}$ . One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as$Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$

States and Actions

Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm. We allow the user to specify any variant of the learning formula, using local functions or closures.
Firstly, we have to define a state class, QLState (line 1) that contains a list of actions of type QLAction (line 3) that can be executed from this state. The only purpose of this class is to connect a list of action to a source state. The parameterized class argument property (line 4) is used to "attach" some extra characteristics to this state.

 1 2 3 4 5 6 7 8 class QLState[T]( val id: Int, val actions: List[QLAction[T]] = List.empty, property: T) { @inline def isGoal: Boolean = !actions.isEmpty } 

As described in the introduction, an action of class QLAction has a source state from and a destination state to(state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.

case class QLAction[T](from: Int, to: Int)


The state and action can be loaded, generated and managed by a directed graph or search space of type QLSpace. The search space contains the list of all the possible states available to the agent.
One or more of these states can be selected as goals. The algorithm does not restrict the agent to a single state. The process ends when one of the goal states is reached (OR logic). The algorithm does not support combined goals (AND logic).

Let's implement the basic components of the search space QLSpace. The class list all available states (line 2) and one or more final or goal states goalIds (line 3). Although you would expect that the search space contains a single final or goal state, it is not uncommon to have online training using more than one goal state.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class QLSpace[T]( states: Array[QLState[T]], goalIds: Array[Int]) { // Indexed map of states val statesMap: immutable.Map[Int, QLState[T]] = states.map(st => (st.id, st)).toMap // List set of one or more goals val goalStates = new immutable.HashSet[Int]() ++ goalIds // Compute the maximum Q value for a given state and policy def maxQ(st: QLState[T], policy: QLPolicy[T]): Double = { val best = states.filter( _ != st) .maxBy(_st => policy.EQ(st.id, _st.id)) policy.EQ(st.id, best.id) } // Retrieves the list of states destination of state, st def nextStates(st: QLState[T]): List[QLState[T]] = st.actions.map(ac => statesMap.get(ac.to).get) def init(r: Random): QLState[T] = states(r.nextInt(states.size-1)) } 

A hash map statesMap maintains a dictionary of all the possible states with the state id as unique key (lines 6, 7). The class QLSpace has three important methods:
• init initializes the search with a random state for each training epoch (lines 22, 23)
• nextStates returns the list of destination states associated to the state st (lines 19, 20)
• maxQ return the maximum Q-value for this state st given the current policy policy(lines 12-15). The method filters out itself from the search from the next best action. It then compute the maximum reward or Q(state, action) value according to the given policy policy
The next step is to defined a policy.

Learning Policy
A policy is defined by three components
• A reward collected after transitioning from one state to another state (line 2). The reward is provided by the user
• A Q(State, Action) value, value associated to a transition state and an action (line 4)
• A probability (with default values of 1.0) that defines the obstacles or hindrance to migrate from one state to another (line 3)
The estimate combine the Q-value (incentive to move to the best next step) and probability (hindrance to move to any particular state) (line 7).

 1 2 3 4 5 6 7 8 class QLData { var reward: Double = 1.0 var probability: Double = 1.0 var value: Double = 0.0) { @inline final def estimate: Double = value*probability } 

The policy of type QLPolicy is a container for the state transition attributes, rewards, Q-values and probabilities.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class QLPolicy[T](numStates: Int, input: Array[QLInput]) { val qlData = { val data = Array.tabulate(numStates)( _ => Array.fill(numStates)(new QLData) ) input.foreach(in => { data(in.from)(in.to).reward = in.reward data(in.from)(in.to).probability = in.prob }) data } def setQ(from: Int, to: Int, value: Double): Unit = qlData(from)(to).value = value def Q(from: Int, to: Int): Double = qlData(from)(to).value } 

The constructor for QLPolicy takes two arguments:
• Number of states numStates (line 1)
• Sequence of input of type QLInput to the policy
The constructor create a numStates x numStates matrix of transition of type QLData (lines 3 - 12), from the input.

The type QLInput wraps the input data (index of the input state from, index of the output state to, reward and probability associated to the state transition) into a single convenient class.

case class QLInput(
from: Int,
to: Int,
reward: Double = 1.0,
prob: Double = 1.0)


The next post will dig into the generation of a model through Q-learning training

References

## Friday, November 8, 2013

### Discrete Kalman Optimal Estimator

This post is an introduction to the Kalman optimal filter using the Scala programming language as implementation. The Kalman filter is widely used in signal processing and statistical analysis to quantify or estimate noise created by a process and noise generated by measurement devices.

Overview

A Kalman filter is an optimal estimator that derives parameters from indirect and inaccurate observations. The objective is the algorithm is to minimize the mean square error of the model parameters. The algorithm is recursive and therefore can be used for real-time signal analysis. The Kalman filter has one main limitation: it requires the process to be linear y = a.f(x) + b.g(x) + .... The state is impacted by Gaussian noise in the process and measurement.

Covariance

The  Kalman filter represents the estimated state and error state as a covariance matrix. The covariance of a random vector x = { ..  x  .. } is a n by n positive definite, symmetric matrix with the variance of each variable as diagonal elements $cov(\mathbf{x})= E[(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}] = \int_{-\infty }^{\infty} .. \int_{-\infty }^{\infty}(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}\,p(\mathbf{x})\,dx_{1} .. dx_{n}$ Such a matrix can be diagonalized by computing the eigenvectors (or basis vectors) in order to decouple the contribution of the noise or errors.

State Equation Model

The state of a deterministic deterministic time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise. If x(k) is the n-dimension state vector at step k, u(k) the input vector, w(k) the unknown process noise vector normalized for a zero mean, and R(k) the covariance matrix for the measurement noise at step k, z the actual measurement of the state at stepk, then $\mathbf{x}_{k+1} = A_{k}\mathbf{x}_{k} + B_{k}\mathbf{u}_{k} + \mathbf{w}_{k}\,\,\,(1)\,\,with\,\,R_{k} = E[\mathbf{w}_{k}.\mathbf{w}_{k}^T]\\\mathbf{z}_{k} = H_{k}.\mathbf{x}_{k} + \mathbf{v}_{k}\,\,\,(2)\,\,\,\,with\,\,\,Q_{k} = E[\mathbf{v}_{k}.\mathbf{v}_{k}^T]$ where H(k) is the measurement equation related to the state A(k) and Q(k) is covariance matrix of the process noise at step k. We assume that the covariance matrix for the measurement noise, R and the covariance matrix for the error noise Q follow a Gaussian probability distribution.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Apache Commons Math
We use the matrix classes and methods defined in the Apache Common Math open source library Commons Math 3.3

Filter
We leverage the support for functional constructs provided in Scala. Validation of method arguments, exceptions and non essential supporting methods are omitted for the sake of readability of the code snippets.

First we need to define the noise q generated by the linear process and the noise r generated by the measurement device. Those functions are defines as members of the QRNoise class. These white noise are indeed Gaussian random distributions for the noise on the process and measurement. The validation that the noise distribution follows a normal distribution is omitted.

case class QRNoise(qr: XY, white: Double=> Double) {
// Compute the white noise for process Q
private def q = white(qr._1)
// Compute the white noise for measurement R
private def r = white(qr._2)
// Compute the white noise of the measurement
def noisyQ: Array[Double] = Array[Double](q, q)
// Compute the white noise on the measurement
def noisyR: Array[Double] = Array[Double](r, r)
}


Next, we need to define some internal types The discrete Kalman class, DKalman class has two objectives:
- Encapsulates the types and method defined in Apache Common Math used in the generation of the state and error equations
- Manage the state of the process

The class DKalman takes 5 arguments
• A: The state transition matrix (or model) defined in the first state equation (line 4)
• B: The control/input matrix/model of the state equation (line 5)
• H: The observation matrix/model of the prediction equation (line 6)
• P: The noise covariance matrix (line 7)
• qrNoise: The implicit process/model and measurement/observation noises

These model input parameters are used to compute the following values
• Q Process white noise (line 12)
• R Measurement white noise (line 17)
• filter Instance of the Apache common math Kalman filter class (line 21)
• x Current state vector (line 22)

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 type DblMatrix = Array[Array[Double]] final protected class DKalman( A: DblMatrix, B: DblMatrix, H: DblMatrix, P: DblMatrix)(implicit qrNoise: QRNoise) { type XYSeries = Array[(Double, Double)] // Process related white noise (mean = 0.0) private[this] val Q = new DblMatrix(A.size).map(_ => Array.fill(A(0).size)(qrNoise.qr._1) ) // Measurement related white noise (mean = 0.0) private[this] val R = new DblMatrix(A.size).map(_ => Array.fill(A(0).size)(qrNoise.qr._2) ) private var filter: KalmanFilter = _ private var x: RealVector = _ // Main filtering routine def filter(xt: XYSeries): XYSeries = {} private def initialize(input: DblVector): Unit = {} // Compute the new state of the Kalman iterative computation private def newState: DblVector = {} // Compute the least squared errors of two vectors of type 'RealVector' def lsError(x: RealVector, z: RealVector): Double = {} } 

The method filter. below implements the basic sequences of the execution of an iteration of the update of the state of the process
1. predict the state of the process at the next step (x, A)
2. extract or generate noise for the process and the measurement devices (w, v)
3. update the state of the process (x)
4. computes the error on the measurement (z)

Note: The control input u and initial state x0 are defined as arguments of the main method because they are independent from the model.

The two main stages of the Kalman filter are
- Initialization of the components used in the prediction and correction equation initialize(line 8)
- Execution of the prediction/correction cycle nextState (line 11)

  1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.commons.math3.linear._ import org.apache.commons.math3.filter._ def filter(xt: XYSeries): XYSeries = xt.map{ case (x, y) => { // Initialize the filter initialize(Array[Double](x, y)) // Extract the new state a two values vector val nState = newState (nState(0), nState(1)) }} 

State equation
The method newState implements the iterative state equation of the model
x = A.x + B.u + Q (line 7)
z - H.x + R (line 11)
described in the introduction.

The prediction phase is executed by invoking the Kalman.predict method of the Apache Commons Math library (line 6)and the correction phase relies on the Kalman.correct of the same Java library (line 14).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private def newState: Vector[Double] = { // Update the filter with the predictive value for x // and update it with the A transition matrix with the // process noise qr.Q filter.predict x = A.operate(x).add(qrNoise.noisyQ) // Compute the measured value z with the new update value // using the measurement-statement dependency matrix H val z = H.operate(x).add(qrNoise.noisyR) // Update the filter with the new estimated measure z filter.correct(z) filter.getStateEstimation } 

The method initalize create two instances of classes defined in the Apache commons math
• pModel to model the process with the relevant matrices (line 2)
• mModel for the measurement (line 3)

  1 2 3 4 5 6 7 8 9 10 11 def initialize(input: Vector[Double]): Unit = { val pModel = new DefaultProcessModel(A, B, Q, input, P) val mModel = new DefaultMeasurementModel(H, R) // Create a Kalman filter with a model pModel for // the process and a model mModel for the measurement. filter = new KalmanFilter(pModel, mModel) // Conversion to Apache Commons Math internal types x = new ArrayRealVector(input) } 

Lastly, we need to compute the least squares error of the predicted states compared to the actual state values. The type RealVector is defined in the Apache commmon math library.

def lsError(x: RealVector, z: RealVector): Double = {
val sumSqr = x.toArray
     .zip(z.toArray)
     .map(xz => (xz._1 - xz._2))
.map( x => x*x).sum
  Math.sqrt(sumSqr)
}


Example
We will use a simple example of the Newton law of gravity.  If x is the weight of an object, the differential equation can be integrated with a step 1 as follows $\frac{\mathrm{d}^2 y_{t}}{\mathrm{d} t^{2}} + g = 0\,\,\Rightarrow\\ y_{t+dt} = y_{t}+ \dot{y}_{t}\,dt - \frac{g}{2}\,dt^{2}\,\,\,;\,\,\dot{y}_{t+1} = \dot{y}_{t} - g\,dt\,\,\,(\mathbf{3})$ The state vector x(k) for object at time k is defined by $\mathbf{x}_{k} = [y_{k},\frac{\mathrm{d} y_{k}}{\mathrm{d} x}]^{T}$ and the state equation   $\mathbf{x}_{k+1} = A_{k}.\mathbf{x}_{k} + B_{k}.\mathbf{u}_{k}\,\,\,with\,\,\,A_{k} =\begin{vmatrix} 1 & dt\\ 0 & 1 \end{vmatrix}\,\,,\,B_{k}=\begin{vmatrix}0.5.dt.dt \\ dt \end{vmatrix}$ We use the Apache Commons Math library version 3.0 (Apache Commons Math User Guide to filter and predict the motion of a body subjected to gravity.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 val dt = 0.05 val g = 9.81 val initialHeight = 100.0 val initialSpeed = 0.0 val variance = 0.014 // Forces an implicit type conversion val A: DblMatrix = ( (1.0, dt), (0.0, 1.0) ) val B: DblMatrix = ( 0.5*g*dt*dt, g*dt ) val H: DblMatrix = (1.0, 0.0) val Q: DblMatrix = ( (0.25*variance*Math.pow(dt, 4), variance*Math.pow(dt,3)/2), (variance*Math.pow(dt,3)/2, variance*dt*dt) ) val P0: DblMatrix = ( (0.02, 0.0), (0.0, 0.03) ) // Initialize the drop at 100 feet with no speed val x0 = Array[Double](initialHeight, initialSpeed) val qrNoise = new QRNoise((0.7, 0.3), (m: Double) => m*Random.nextGaussian) // Create the process and noise models val model = new DKalman(A, H, P0)) val output = model.filter(trajectory) 

The State transition matrix A is initialized with the diagonal element (x and dx/dt) of 1 and a12=x/(dx/dt) = dt (lines 8 - 10). The control vector B implements the coefficients of the first equation (3) with the values g.dt.dt/2 and g.dt (lines 12 -14). The trajectory is the only variable that is actually observed (speed and acceleration are not observed). Therefore, the measurement matrix H has only one non-zero entry (line 16). The values for the noise on trajectory and speed for the model and the measurement is defined by the matrix Q (lines 17 - 19). The covariance matrix P0 (lines 21 - 23) completes the initialization for the process and measurement.

References