Thursday, December 7, 2017

Covariant and Contravariant Functors in Scala

Target audience: Advanced
Estimated reading time: 15'


Scala is a first class functional programming and object-oriented language which supports among other concepts, higher-kind types, functors and monads. 
This post illustrates the capability of Scala to leverage the concepts of covariant and contravariant functors for tensor analysis with application to vector fields

Note: This post requires a solid knowledge of functional programming as well understanding of differential geometry.


Overview
Most of Scala developers have some experience with the core tenets of functional programming: monads, functors and applicatives. Those concepts are not specific to Scala or even functional programming at large. There are elements of a field in Mathematics known as topology or algebraic topology. Differential geometry or differential topology makes heavy use of tensors that leverage covariant and contravariant functors.

This post introduces the concepts of
  • Contravariant functors applied to co-vectors and differential forms
  • Projection of higher kind

Vector fields 101
Let's consider a 3 dimension Euclidean space with basis vector {ei} and a vector field V (f1, f2, f3) [Note: we follow Einstein tensor indices convention]

The vector field at the point P(x,y,z) as the tuple (f1(x,y,z), f2(x,y,z), f3(x,y,z)).
The vector over a field of k dimension field can be formally. mathematically defined as
\[f: \boldsymbol{x} \,\, \epsilon \,\,\, \mathbb{R}^{k} \mapsto \mathbb{R} \\ f(\mathbf{x})=\sum_{i=1}^{n}{f^{i}}(\mathbf{x}).\mathbf{e}^{i}\] Example: \[f(x,y,z) = 2x+z^{3}\boldsymbol{\mathbf{\overrightarrow{i}}} + xy+e^{-y}-z^{2}\boldsymbol{\mathbf{\overrightarrow{j}}} + \frac{x^{3}}{y}\boldsymbol{\mathbf{\overrightarrow{k}}}\]
Now, let's consider the same vector V with a second reference (origin O' and basis vector e'i

\[f(\mathbf{x})=\sum_{i=1}^{n}{f'_{i}}(\mathbf{x}).\mathbf{e'}_{i}\]
The transformation matrix Sij convert the coordinates value functions fi and f'i. The tuple f =(fi) or more accurately defined as (fi) is the co-vector field for the vector field V
\[S_{ij}: \begin{Vmatrix} f^{1} \\ f^{2} \\ f^{3} \end{Vmatrix} \mapsto \begin{Vmatrix} f'^{1} \\ f'^{2} \\ f'^{3} \end{Vmatrix}\]
The scalar product of the co-vector f' and vector v(f) defined as is defined as
\[< f',v> = \sum f'_{i}.f^{i}\]
Given the scalar product we can define the co-vector field f' as a linear map
\[\alpha (v) = < f',v> (1) \] 

 
Covariant functors
I assume the reader has basic understanding of Functor and Monads. Here is short overview:

A category C is composed of object x and morphism f defined as
\[C= \{ {x_{i}, f \in C | f: x_{i} \mapsto x_{j}} \}\] A functor F is a map between two categories C and D that preserves the mapping.
\[x\in C \Rightarrow F(x)\in D \\ x, y\in C \,\,\, F: x \mapsto y => F(x)\mapsto F(y)\]
Let's look at the definition of a functor in Scala with the "preserving" mapping method, map

1
2
3
trait Functor[M[_]] {
  def map[U, V](m: M[U])(f: U => V): M[V]
}

Let's define the functor for a vector (or tensor) field. A vector field is defined as a sequence or list of fields (i.e. values or function values).

type VField[U] = List[U]

trait VField_Ftor extends Functor[VField] {
  override def map[U, V](vu: VField[U])(f: U => V): VField[V] = vu.map(f) 
}

This particular implementation relies on the fact that List is a category with its own functor. The next step is to define the implicit class conversion VField[U] => Functor[VField[U]] so the map method is automatically invoked for each VField instance.

implicit class vField2Functor[U](vu: VField[U]) 
    extends VField_Ftor {  
    
  final def map[V](f: U => V): VField[V] = 
     super.map(vu)(f) 
}

By default Covariant Functors (which preserve mapping) are known simply as Functors. Let's look at the case of Covector fields.


Contravariant functors
A Contravariant functor is a map between two categories that reverses the mapping of morphisms.
\[x, y\in C \,\,\, F: x \mapsto y => F(y)\mapsto F(x)\]

trait CoFunctor[M[_]] {
  def map[U, V](m: M[U])(f: V => U): M[V]
}

The map method of the Cofunctor implements the relation M[V->U] => M[U]->M[V]
Let's implement a co-vector field using a contravariant functor. The definition (1) describes a linear map between a vector V over a field X to the scalar product V*: V => T.
A morphism on the category V* consists of a morphism of V => T or V => _ where V is a vector field and T or _ is a scalar function value.

type CoField[V, T] = Function1[V, T]

The co-vector field type, CoField is parameterized on the vector field type V which is a input or function parameter. Therefore the functor has to be contravariant.

The higher kind type M[_] takes a single type as parameter (i.e. M[V]) but a co-vector field requires two types:
  • V: Vector field
  • T: The scalar function is that the result of the inner product <.>

Fortunately the contravariant functor CoField_Ftor associated with the co-vector needs to be parameterized only with the vector field V. The solution is to pre-defined (or 'fix') the scalar type T using a higher kind projector for the type L[V] => CoField[V, T]
T => ({type L[X] = CoField[X,T]})#L

trait CoField_Ftor[T] 
  extends CoFunctor[({type L[X] = CoField[X,T]})#L ] {

  override def map[U,V](
    vu: CoField[U,T]
  )(f: V => U): CoField[V,T] = 
   (v: V) => vu(f(v))
}

As you can see the morphism over the type V on the category CoField is defined as f: V => U instead of f: U => V. A kind parameterized on the return type (Function1) would require the 'default' (covariant) functor. Once again, we define an implicit class to convert a co-vector field, of type CoField to its functor, CoField2Ftor

implicit class CoField2Ftor[U,T](vu: CoField[U,T]) 
    extends CoField_Ftor[T] {
  
  final def map[V](f: V => U): CoField[V,T] = 
     super.map(vu)(f) 
}


Evaluation
Let's consider a field of function values FuncD of two dimension: v(x,y) = f1(x,y).i + f2(x,y.j. The Vector field VField is defined as a list of two function values.

type DVector = Array[Double]
type FuncD = Function1[DVector, Double]
type VFieldD = VField[FuncD]

The vector is computed by assigning a vector field to a specific point (P(1.5, 2.0). The functor is applied to the vector field, vField to generate a new vector field vField2

val f1: FuncD = new FuncD((x: DVector) => x(0)*x(1))
val f2: FuncD = new FuncD((x: DVector) => -2.0*x(1)*x(1))
  
val vfield: VFieldD = List[FuncD](f1, f2)
val value: DVector = Array[Double](1.5, 2.0)  
val vField2: VFieldD = vfield.map( _*(4.0))

A co-vector field, coField is computed as the sum of the fields (function values) (lines 1, 2). Next, we compute the product of co-vector field and vector field (scalar field product) (line 6). We simply apply the co-vector Cofield (linear map) to the vector field. Once defined, the morphism _morphism is used to generate a new co-vector field coField2 through the contravariant function CoField2Functor.map(line 10).
Finally a newProduction is generated by composing the original covariant field Cofield and the linear map coField2 (line 12).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val coField: CoField[VFieldD, FuncD] = 
    (vf: VFieldD) => vf(0) + vf(1)

val contraFtor: CoField2Functor[VFieldD, FuncD] = coField
  
val product = coField(vField)
val _morphism: VFieldD => VFieldD = 
   (vf: VFieldD) => vf.map( _*(3.0))

val coField2 = contraFtor.map( _morphism )
   
val newProduct: FuncD = coField2(coField)


Environment
Scala 2.11.8
JDK 1.8

References
  • Tensor Analysis on Manifolds - R. Bishop, S. Goldberg - Dover publication 1980
  • Differential Geometric Structures - W. Poor - Dover publications 2007
  • Functors and Natural Transformationsv- A. Tarlecki - Category Theory 2014

Friday, October 27, 2017

Reinforcement learning in Scala

Target audience: Advanced
Estimated reading time: 30'


You may wonder how robots, autonomous systems or a software game player learn. The answer lies in a field of AI known as reinforcement learning. For example, a robot navigating a maze plans his next move according to its current location and previous moves. Teaching a robot all possible move according to the different location in the maze is not realistic, making any supervised learning technique inadequate. This article describes a very common reinforcement learning methodology, Q-learning and its implementation in Scala.


Overview
There are many different reinforcement learning techniques. One of the most commonly used method is searching the value function space using temporal difference method.
All known reinforcement learning methods share the same objective of solving the problem of finding the optimum sequential decision tasks. In a sequential decision task, an agent interacts with a dynamic system by selecting actions that affect the transition between states in order to optimize a given reward function.

At any given step i, the agent select an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:\[s_{i+1}=V(s_{i})\]
The learning agent infers the policy that maps the set of states {s} to the set of available actions {a}, using a value function  \[V(s_{i})\] The policy is defined at \[\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}\]


Temporal Difference
The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as \[V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})\]
Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The 'action-value' function represents the expected value of action a on a state s and defined as \[Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})\] where r is the reward value for the state.


On-policies vs. Off-policy
The Temporal Difference method relies on the estimate of the final reward to be computed for each state. There are two methods of the Temporal Difference algorithm:On-Policy and Off-Policy:
  - On-Policy method learns the value of the policy used to make the decision. The value function is derived from the execution of actions using the same policy but based on history
 - Off-Policy method learns potentially different policies. Therefore the estimate is computed using actions that have not been executed yet.

The most common formula for temporal difference approach is the Q-learning formula. It introduces the concept of discount rate to reduce the impact of the first few states on the optimization of the policy. It does not need a model of its environment. The exploitation of action-value approach consists of selecting the next state is by computing the action with the maximum reward. Conversely the exploration approach focus on the total anticipated reward.The update equation for the Q-Learning is \[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\] \[Q(s_{i},a_{i}): \mathrm{expected\,value\,action\,a\,on\,state\,s}\,\,\eta : \mathrm{learning\,rate}\,\,\alpha : \mathrm{discount\,rate}\] . One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as\[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\]

States and Actions
Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm. We allow the user to specify any variant of the learning formula, using local functions or closures.

Firstly, we have to define a state class, QLState (line 1) that contains a list of actions of type QLAction (line 3) that can be executed from this state. The only purpose of this class is to connect a list of action to a source state. The parameterized class argument property (line 4) is used to "attach" some extra characteristics to this state.

1
2
3
4
5
6
7
8
class QLState[T](
  val id: Int, 
  val actions: List[QLAction[T]] = List.empty, 
  property: T) {
    
  @inline
  def isGoal: Boolean = !actions.isEmpty
}

As described in the introduction, an action of class QLAction has a source state from and a destination state to(state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.

case class QLAction[T](from: Int, to: Int)

The state and action can be loaded, generated and managed by a directed graph or search space of type QLSpace. The search space contains the list of all the possible states available to the agent.
One or more of these states can be selected as goals. The algorithm does not restrict the agent to a single state. The process ends when one of the goal states is reached (OR logic). The algorithm does not support combined goals (AND logic).

Let's implement the basic components of the search space QLSpace. The class list all available states (line 2) and one or more final or goal states goalIds (line 3). Although you would expect that the search space contains a single final or goal state, it is not uncommon to have online training using more than one goal state.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class QLSpace[T](
   states: Array[QLState[T]], 
   goalIds: Array[Int]) {

    // Indexed map of states 
  val statesMap: immutable.Map[Int, QLState[T]] = 
    states.map(st => (st.id, st)).toMap
    // List set of one or more goals  
  val goalStates = new immutable.HashSet[Int]() ++ goalIds
 
    // Compute the maximum Q value for a given state and policy
  def maxQ(st: QLState[T], policy: QLPolicy[T]): Double = { 
    val best = states.filter( _ != st)
       .maxBy(_st => policy.EQ(st.id, _st.id))
    policy.EQ(st.id, best.id)
  }
 
    // Retrieves the list of states destination of state, st
  def nextStates(st: QLState[T]): List[QLState[T]] =
     st.actions.map(ac => statesMap.get(ac.to).get)
 
  def init(r: Random): QLState[T] = 
    states(r.nextInt(states.size-1))
}

A hash map statesMap maintains a dictionary of all the possible states with the state id as unique key (lines 6, 7). The class QLSpace has three important methods:
  • init initializes the search with a random state for each training epoch (lines 22, 23)
  • nextStates returns the list of destination states associated to the state st (lines 19, 20)
  • maxQ return the maximum Q-value for this state st given the current policy policy(lines 12-15). The method filters out itself from the search from the next best action. It then compute the maximum reward or Q(state, action) value according to the given policy policy
The next step is to defined a policy.


Learning Policy
A policy is defined by three components
  • A reward collected after transitioning from one state to another state (line 2). The reward is provided by the user
  • A Q(State, Action) value, value associated to a transition state and an action (line 4)
  • A probability (with default values of 1.0) that defines the obstacles or hindrance to migrate from one state to another (line 3)
The estimate combine the Q-value (incentive to move to the best next step) and probability (hindrance to move to any particular state) (line 7).

1
2
3
4
5
6
7
8
class QLData {
  var reward: Double = 1.0
  var probability: Double = 1.0
  var value: Double = 0.0) {
  
  @inline
  final def estimate: Double = value*probability
}

The policy of type QLPolicy is a container for the state transition attributes, rewards, Q-values and probabilities.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class QLPolicy[T](numStates: Int, input: Array[QLInput]) {
 
  val qlData = {
    val data = Array.tabulate(numStates)(
      _ => Array.fill(numStates)(new QLData)
    )
 
    input.foreach(in => {  
      data(in.from)(in.to).reward = in.reward
      data(in.from)(in.to).probability = in.prob
    })
    data
  }
  
  def setQ(from: Int, to: Int, value: Double): Unit =
     qlData(from)(to).value = value
 
  def Q(from: Int, to: Int): Double = qlData(from)(to).value
}

The constructor for QLPolicy takes two arguments:
  • Number of states numStates (line 1)
  • Sequence of input of type QLInput to the policy
The constructor creates a numStates x numStates matrix of transition of type QLData (lines 3 - 12), from the input.
The type QLInput wraps the input data (index of the input state from, index of the output state to, reward and probability associated to the state transition) into a single convenient class.

case class QLInput(
   from: Int, 
   to: Int, 
   reward: Double = 1.0, 
   prob: Double = 1.0)



Model and Training
The first step is to define a model for the reinforcement learning. A model is created during training and is composed of
  • Best policy to transition from any initial state to a goal state
  • Coverage ratio as defined as the percentage of training cyles that reach the (or one of the) goal
class QLModel[T](val bestPolicy: QLPolicy[T], val coverage: Double)

The QLearning class takes 3 arguments
  • A set of configuration parameters config
  • The search/states space qlSpace
  • The initial policy associated with the states (reward and probabilities) qlPolicy
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class QLearning[T](
   config: QLConfig, 
   qlSpace: QLSpace[T], 
   qlPolicy: QLPolicy[T]) 

    //model in Q-learning algorithm
  val model: Option[QLModel[T]] = train.toOption
    
    // Generate a model through multi-epoch training
  def train: Try[Option[QLModel[T]]] {}
  private def train(r: Random): Boolean {}

   // Predict a state as a destination of this current 
   // state, given a model
  def predict : PartialFunction[QLState[T], QLState[T]] {}

  // Select next state and action index
  def nextState(st: (QLState[T], Int)): (QLState[T], Int) {} 
}

The model of type Option[QLModel] (line 7) is created by the method train (line 10). Its value is None if training failed.

The training method train consists of executing config.numEpisodes cycle or episode of a sequence of state transition (line 5). The random generator r is used in the initialization of the search space.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def train: Option[QLModel[T]] = {
  val r = new Random(System.currentTimeMillis)

  Try {
    val completions = Range(0, config.numEpisodes).filter(train(r) )

    val coverage = completions.toSize.toDouble/config.numEpisodes
    if(coverage > config.minCoverage) 
       new QLModel[T](qlPolicy, coverage)
    else 
       QLModel.empty[T]
  }.toOption
}

The training process exits with the model if the minimum minCoverage (number of episodes for which the goal state is reached) is met (line 8).

The method train(r: scala.util.Random) uses a tail recursion to transition from the initial random state to one of the goal state. The tail recursion is implemented by the search method (line 4). The method implements the recursive temporal difference formula (lines 14-18).
The state for which the action generates the highest reward R given a policy qlPolicy (line 10) is computed for each new state transition. The Q-value of the current policy is then updated qlPolicy.setQ before repeating the process for the next state, through recursion (line 21).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def train(r: Random): Boolean = {
   
  @scala.annotation.tailrec
  def search(st: (QLState[T], Int)): (QLState[T], Int) = {
    val states = qlSpace.nextStates(st._1)
    if( states.isEmpty || st._2 >= config.episodeLength ) 
        (st._1, -1)
    
    else {
      val state = states.maxBy(s => qlPolicy.R(st._1.id, s.id))
      if( qlSpace.isGoal(state) )
          (state, st._2)
      else {
        val r = qlPolicy.R(st._1.id, state.id)   
        val q = qlPolicy.Q(st._1.id, state.id)
        // Q-Learning formula
        val deltaQ = r + config.gamma*qlSpace.maxQ(state, qlPolicy) -q
        val nq = q + config.alpha*deltaQ
        
        qlPolicy.setQ(st._1.id, state.id,  nq)
        search((state, st._2+1))
       }
     }
  } 
   
  r.setSeed(System.currentTimeMillis*Random.nextInt)

  val finalState = search((qlSpace.init(r), 0))
  if( finalState._2 != -1) 
    qlSpace.isGoal(finalState._1) 
  else 
    false
}

Note: There is no guarantee that one of the goal state is reached from any initial state chosen randomly. It is expected that some of the training epoch fails. This is the reason why monitoring coverage is critical. Obviously, you may choose a deterministic approach to the initialization of each training epoch by picking up any state beside the goal state(s), as a starting state.


Prediction
Once trained, the model is used to predict the next state with the highest value (or probability) given an existing state. The prediction is implemented as a partial function.

1
2
3
4
def predict : PartialFunction[QLState[T], QLState[T]] = {
  case state: QLState[T] if(model != None) => 
    if( state.isGoal) state else nextState(state, 0)._1
}

The method nextState does the heavy lifting. It retrieves the list of states associated with the current state st through its actions set (line 2). The next most rewarding state qState is computed using the reward matrix R of the best policy of the QLearning model (lines 6 - 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def nextState(st: (QLState[T], Int)): (QLState[T], Int) =  {
  val states = qlSpace.nextStates(st._1)
  if( states.isEmpty || st._2 >= config.episodeLength) 
    st
  else {
    val qState = states.maxBy(
     s => model.map(_.bestPolicy.R(st._1.id, s.id))
           .getOrElse(-1.0)
    )

    nextState( (qState, st._2+1))
  }
}


Environment
Scala: 2.11.8
Java JDK 1.8

Conclusion
An article or blog spot can not realistically describe all the elements and strategies of reinforcement learning from K-armed bandits to deep learning. However, this chapter should provide you with a road map on how to implement a simple reinforcement learning algorithm in Scala.