## Wednesday, February 7, 2018

Target audience: Intermediate

The stochastic gradient descent (SGD) optimization algorithm is commonly used in minimizing the loss function in the training of machine learning models such as support vector machines, logistic regression or back-propagation neural networks. In its simplest incarnation, the gradient is computed using a single learning rate.

However, it is not uncommon for the features of a model to have a wide range of variance between observations. In this case an adaptive gradient algorithm, which assigns a learning rate to each feature, may be the solution. There are many different approaches to implement an algorithm that attributes a learning rate to each feature. This post describes the AdaGrad algorithm and its implementation in Apache Spark MLlib.

Note: This post assumes that reader has rudimentary knowledge of the Scala API of Apache Spark and basic understanding of machine learning.

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.

The Apache Spark ecosystems includes a machine learning library, MLlib.
The stochastic gradient descent optimizer is a randomized approximation of the (batched) gradient descent algorithm used to minimize a continuous differentiable objective function. In supervised machine learning, the objective function is a loss function (logistic, sum of least squares..).
$L(w)=\frac{1}{n}\sum_{i=0}^{n}(y_{i}-f(x_{i}|w))^{2}$ The objective function L is expressed as the summation of differentiable functions. In supervised learning, the loss related to a specific feature is defined as a continuous, differentiable, convex function. $L(w)=\sum_{i=1}^{n}L_{i}(w)$ In supervised learning, the vector w represent the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula $w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}$ The stochastic gradient descent (SGD) minimizes the loss function between the expected value and the predictive values generated by the model. At each iteration, SGD, selects a subset of the observations (known as a mini-batch) used in the training of the model. The iterative process is expected to converged toward the true global minimum of the loss function.

The main idea behind AdaGrad is the need to increase the learning rate for the sparse features (or model parameters) and decrease the learning rate for features that are denser. Therefore, AdaGrad improves the convergence of the minimization of the loss for model with sparse features, given that these sparse features retains information.
$w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}$

SGD in Apache Spark
The Apache spark MLlib library has two implementations of SGD
• Generic Gradient Descent and related classes in the mllib.optimization package
• SGD bundled with classifier or regression algorithms such as LogisticRegressionWithSGD, LassoWithSGD, SVMWithSGD or RidgeRegressionWithSGD

We will be using the optimization package in order to customize the stochastic gradient descent. The objective is to leverage the mllib.optimization.GradientDescent template class and implement the adaptive gradient with per-feature learning rate by creating a customize Updater.

The updater "updates the weights of the model" (Logistic regression or SVM) with the product of the current learning rate with the partial derivative of the loss over this weight (as described in the previous section). Let's call AdaGradUpdater the updater that implement the update of the model weights using the adaptive gradient. The SGD is then instantiated as follow
   val adaSGD = new GradientDescent.
.setStepSize(0.01)
. .....

  Updater.compute(
oldWeights: Vector,
stepSize: Double,
iter: Int,
regCoefs: Double): (Vector, Double)
The method returns the tuple (vector of new weights, loss). Let's implement the AdaGrad algorithm

As mentioned earlier, the implementation of AdaGrad consists of overriding the method Updater.compute

The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array gradientHistory (line 3). First we define the method += to update the gradient history (lines 27-36). The first call to the method creates the gradient history (line 31).
The next step consists of converting the existing (old) weights into a Breeze dense vector brzWeights (line 14). The array of the new learning rates is computed as the inverseVelocity coefficient (line 39).
The learning rates are zipped with the old weights (line 15) to update the weights newWeights as a new dense vector(line 21). The linear algebra (matricial computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to brzAxpy (line 21) and brzNorm (line 22).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @DeveloperApi final class AdaGradL2Updater(dimension: Int) extends Updater { private[this] var gradientsHistory: Array[Double] = _ override def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double ): (Vector, Double) = { +=(gradient) val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray) val newWeights: BV[Double] = new DenseVector[Double](sumSquareDerivative.view.map { case (coef, weight) => weight * (1.0 -regParam * coef) }) brzAxpy(-1.0, gradient.toBreeze, newWeights) val norm = brzNorm(brzWeights, 2.0) (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) } private def +=(gradient: Vector): Unit = { val grad = gradient.toArray grad.view.zipWithIndex.foreach { case (g, index) => { if (gradientsHistory == null) gradientsHistory = Array.fill(grad.length)(0.0) val existingGradient = gradientsHistory(index) gradientsHistory.update(index, existingGradient + g*g) } } } def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_)) } 

Environment
Scala: 2.11.8
Java JDK 1.8
Apache Spark 2.2.2

References