The stochastic gradient descent (SGD) optimization algorithm is commonly used in minimizing the loss function in the training of machine learning models such as support vector machines, logistic regression or back-propagation neural networks. In its simplest incarnation, the gradient is computed using a single learning rate.

However, it is not uncommon for the features of a model to have a wide range of variance between observations. In this case an adaptive gradient algorithm, which assigns a learning rate to each feature, may be the solution. There are many different approaches to implement an algorithm that attributes a learning rate to each feature. This post describes the

**AdaGrad**algorithm and its implementation in**Apache Spark MLlib**.__Note__: This post assumes that reader has rudimentary knowledge of the Scala API of Apache Spark and basic understanding of machine learning.**Stochastic Gradient Descent**

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.

The Apache Spark ecosystems includes a machine learning library,

The stochastic gradient descent optimizer is a randomized approximation of the (batched) gradient descent algorithm used to minimize a continuous differentiable objective function. In supervised machine learning, the objective function is a loss function (logistic, sum of least squares..).**MLlib**.\[L(w)=\frac{1}{n}\sum_{i=0}^{n}(y_{i}-f(x_{i}|w))^{2}\] The objective function

**L**is expressed as the summation of differentiable functions. In supervised learning, the loss related to a specific feature is defined as a continuous, differentiable, convex function. \[L(w)=\sum_{i=1}^{n}L_{i}(w)\] In supervised learning, the vector

**w**represent the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula \[w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}\] The stochastic gradient descent (SGD) minimizes the loss function between the expected value and the predictive values generated by the model. At each iteration, SGD, selects a subset of the observations (known as a mini-batch) used in the training of the model. The iterative process is expected to converged toward the

*true global*minimum of the loss function.

Adaptive Gradient Descent

The main idea behind

\[w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}\]

**AdaGrad**is the need to increase the learning rate for the sparse features (or model parameters) and decrease the learning rate for features that are denser. Therefore, AdaGrad improves the convergence of the minimization of the loss for model with sparse features, given that these sparse features retains information.\[w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}\]

**SGD in Apache Spark**

The Apache spark MLlib library has two implementations of SGD

- Generic Gradient Descent and related classes in the
**mllib.optimization**package - SGD bundled with classifier or regression algorithms such as
**LogisticRegressionWithSGD**,**LassoWithSGD**,**SVMWithSGD**or**RidgeRegressionWithSGD**

We will be using the optimization package in order to customize the stochastic gradient descent. The objective is to leverage the

**mllib.optimization.GradientDescent**template class and implement the*adaptive gradient with per-feature learning rate*by creating a customize**Updater**.
The updater "

*updates the weights of the model*" (Logistic regression or SVM) with the product of the current learning rate with the partial derivative of the loss over this weight (as described in the previous section). Let's call**AdaGradUpdater**the updater that implement the update of the model weights using the adaptive gradient. The SGD is then instantiated as follow```
val adaSGD = new GradientDescent.
.setUpdater(new AdaGradUpdater)
.setStepSize(0.01)
. .....
```

The class **AdaGradUpdater**has to implement the generic compute method

```
Updater.compute(
oldWeights: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regCoefs: Double): (Vector, Double)
```

The method returns the tuple (vector of new weights, loss). Let's implement the AdaGrad algorithm

Implementation of AdaGrad

As mentioned earlier, the implementation of AdaGrad consists of overriding the method

**Updater.compute**
The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array

**gradientHistory**(line 3). First we define the method**+=**to update the gradient history (lines 27-36). The first call to the method creates the gradient history (line 31).
The next step consists of converting the existing (old) weights into a

*Breeze*dense vector**brzWeights**(line 14). The array of the new learning rates is computed as the**inverseVelocity**coefficient (line 39).
The learning rates are zipped with the old weights (line 15) to update the weights

**newWeights**as a new dense vector(line 21). The linear algebra (matricial computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to**brzAxpy**(line 21) and**brzNorm**(line 22).1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | @DeveloperApi final class AdaGradL2Updater(dimension: Int) extends Updater { private[this] var gradientsHistory: Array[Double] = _ override def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double ): (Vector, Double) = { +=(gradient) val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray) val newWeights: BV[Double] = new DenseVector[Double](sumSquareDerivative.view.map { case (coef, weight) => weight * (1.0 -regParam * coef) }) brzAxpy(-1.0, gradient.toBreeze, newWeights) val norm = brzNorm(brzWeights, 2.0) (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) } private def +=(gradient: Vector): Unit = { val grad = gradient.toArray grad.view.zipWithIndex.foreach { case (g, index) => { if (gradientsHistory == null) gradientsHistory = Array.fill(grad.length)(0.0) val existingGradient = gradientsHistory(index) gradientsHistory.update(index, existingGradient + g*g) } } } def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_)) } |

Environment

Scala: 2.11.8

Java JDK 1.8

Apache Spark 2.2.2

References

- Apache Spark API
- An Overview of Gradient Descent Optimization Algorithms - Sebastien Ruder (2016)
- Machine Learning: A probabilistic perspective - K. Murphy 2012 MIT press - 8.5 Online learning and stochastic optimization

