Wednesday, November 21, 2018

Integration of 3rd party service with Spark

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external (3rd party) remote services such as NLP processing, deep learning model training or images classification. Moreover, these services, accessible through a REST API for instance may not be easily re-configurable.
A remote service is usually deployed as clusters of nodes (or instances) to improve scalability and guarantee high availability. The question becomes, what is the most efficient approach to generate and process requests to a cluster of services?

This post evaluates the performance of the execution of a remote service deployed on multiple nodes as part of a Apache Spark application. We compare two approaches to integrate Spark workers to the 3rd party service nodes using 

  • a load balancer
  • map to assign service nodes to each partition

Load balancer
Load balancers are commonly used for routing request to web services according to a policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of web servers.
A load balancer is a very simple and easily deployable solution that is self contained and  does not involve any architecture or coding changes.


In a typical Apache Spark deployment, the context splits data sets into partitions. Each  partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer



The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balance
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning directly the service cluster nodes to each partition.


Partitioning service nodes
The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.



The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
    val hashedCode = input.hashCode + currentTimeMillis
    val seed = (if (hashedCode < 0) -hashedCode 
         else hashedCode)
    val random = new scala.util.Random(seed)
    val serverHash = random.nextInt(serverAddresses.length)
    val targetIp = serverAddresses(serverHash)

    val url = s"http://${targetIp}:80/endpoint"
    val httpConnector = HttpConnector(url, timeout)
     // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
 notes: Dataset[Input],
 timeout: Int,
 serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
 inputs.map(
  input => 
     if (serverAddresses.nonEmpty) 
         hashedEndPoints(input, timeout, serviceAddresses)
     else
         throw new IlegalStateException("error ...")
   )
}

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.

Performance evaluation
Let's measure the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of nodes or instances. The following chart plots the average duration the NLP processes a document with different number of NLP processing nodes.



It is clear that although the performance improvement is not striking for a small number of service nodes, the direct assignment of the NLP processing nodes to Spark partitions reduces latency by as much as 20% for larger number of nodes (or instances).


Environment
Scala 2.12.1
JDK 1.8
Apache Spark 2.3.2
AWS EMR 5.19


References
Introduction to Load Balancing
Spark Partitions and Partitioning