Sunday, June 5, 2016

Extending Apache Spark/MLlib with AdaGrad

The stochastic gradient descent (SGD) is a critical element in the training of machine learning models such as support vector machines, logistic regression or back-propagation neural networks. In its simplest incarnation, the gradient is computed using a single learning rate.
However, it is not uncommon for the features of a model to have a wide range of variance between observations. In this case an adaptive gradient algorithm, which assign a learning rate for each feature, may be the solution. There are many different approaches to implement per-feature learning rate: this post describes the AdaGrad algorithm and its implementation in Apache Spark 2.0

Stochastic Gradient Descent
The stochastic gradient descent is a randomized approximation of the gradient descent (batch) algorithm to minimize a continuous differentiable objective function. In supervised machine learning, the objective function is a loss function (logistic, sum of squares..).
\[L(w)=\frac{1}{n}\sum_{i=0}^{n}(y_{i}-f(x_{i}|w))^{2}\] The objective function L is expressed as the summation of differentiable functions. In the case of the loss function in supervised learning, the differentiable functions are the loss related to a specific feature. \[L(w)=\sum_{i=1}^{n}L_{i}(w)\] In supervised learning, the vector w represent the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula \[w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}\] The stochastic gradient descent (SGD) minimizes the loss function between the expected value and the predictive values generated by the model. At each iteration, SGD, select a subset of the observations (known as a mini-batch) used in the training of the model. The iterative process is expected to converged toward the true global minimum of the loss function.

Adaptive Gradient Descent
The main reason behind AdaGrad is the need to increase the learning rate for the sparse features (or model parameters) and decrease the learning rate for features that are denser. Therefore, AdaGrad improves the convergence of the minimization of the loss for model with sparse features, given that these sparse features retains information.
\[w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}\]

SGD in Apache Spark
The Apache spark MLlib library has two implementations of SGD
  • Generic Gradient Descent and related classes in the mllib.optimization package
  • SGD bundled with classifier or regression algorithms such as LogisticRegressionWithSGD, LassoWithSGD, SVMWithSGD or RidgeRegressionWithSGD
We will be using the optimization package in order to customize the stochastic gradient descent. The objective is to leverage the mllib.optimization.GradientDescent template class and implement the adaptive gradient with per-feature learning rate by creating a customize Updater.
The updater "updates the weights of the model" (Logistic regression or SVM) with the product of the current learning rate with the partial derivative of the loss over this weight (as described in the previous section). Let's call AdaGradUpdater the updater that implement the update of the model weights using the adaptive gradient. The SGD is then instantiated as follow
   val adaSGD = new GradientDescent.
                    .setUpdater(new AdaGradUpdater)
                    .setStepSize(0.01)
                    . .....
The class AdaGradUpdater has to implement the generic compute method
  Updater.compute(
     oldWeights: Vector, 
     gradient: Vector, 
     stepSize: Double, 
     iter: Int, 
     regCoefs: Double): (Vector, Double)
The method returns the tuple (vector of new weights, loss)
Let's implement the AdaGrad algorithm

Implementation of AdaGrad

As mentioned earlier, the implementation of AdaGrad consists of overriding the method Updater.compute
The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array gradientHistory (line 3). First we define the method += to update the gradient history (lines 27-36). The first call to the method creates the gradient history (line 31).
The next step consists of converting the existing (old) weights into a Breeze dense vector brzWeights (line 14). The array of the new learning rates is computed as the inverseVelocity coefficient (line 39).
The learning rates are zipped with the old weights (line 15) to update the weights newWeights as a new dense vector(line 21). The linear algebra (matricial computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to brzAxpy (line 21) and brzNorm (line 22).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@DeveloperApi
final class AdaGradL2Updater(dimension: Int) extends Updater {
  private[this] var gradientsHistory: Array[Double] = _

  override def compute(
    weightsOld: Vector,
    gradient: Vector,
    stepSize: Double,
    iter: Int,
    regParam: Double
  ): (Vector, Double) = {

    +=(gradient)
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray)
    val newWeights: BV[Double] = 
        new DenseVector[Double](sumSquareDerivative.view).map {
          case (coef, weight) => weight * (1.0 -regParam * coef)
       })

    brzAxpy(-1.0, gradient.toBreeze, newWeights)
    val norm = brzNorm(brzWeights, 2.0)

    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  }

  private def +=(gradient: Vector): Unit = {
    val grad = gradient.toArray
    grad.view.zipWithIndex.foreach {
      case (g, index) => {
        if (gradientsHistory == null)
          gradientsHistory = Array.fill(grad.length)(0.0)
        val existingGradient = gradientsHistory(index)
        gradientsHistory.update(index, existingGradient + g*g)
      }
    }
  }

  def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_))
}

References

4 comments:

  1. Very interesting formulation for Apache spark. Something that every enthusiast will definitely appreciate, unlike the ones illustrated in SpeedyPaper review which are not understandable.

    ReplyDelete
  2. Wow! SmartPaperHelp review also has a good stuff about Apache Spark but this post is more in-depth and very detailed. Good work.

    ReplyDelete
  3. Dear Patrick, I have tried to run this code but it does not seem to converge. Do you have a link to a full example?

    Thank you!

    ReplyDelete
  4. ActiveWizards sad that for creation a non-linear model, you must specify a unique string identifier and a function of the core of the NonLinearFunction model. From the optional parameters, you can list: the maximum number of iterations of training, the initial approximation of the coefficient vector, and the required accuracy. Nonlinear functions often have a lot of extrema and the choice of the initial approximation, based on a priori ideas about the behavior of a particular nuclear function, allows us to direct the search to the region of the global extremum.

    ReplyDelete