Tuesday, January 19, 2021

Lazy instantiation of dataset from Amazon S3

Target audience: Intermediate
Estimated reading time: 15'

How many times have you thought it would be great to able to instantiate an object or a data set stored on Amazon S3 on demand? It is easier than you think using Scala and Spark functional tool box.

The lazy instantiation of dataset from S3 has been originally developed using Scala 2.12 and Apache Spark 2.4. The code has no specific dependency on any given version of the language or framework and will compile/run on Apache Spark 3.0

Just be lazy

A common requirement in machine learning is to load the configuration parameters associated with model at run-time. The model may have been trained with data segregated by customers, or categories. When deployed for prediction, it is critical to select/load the right set of parameters according to the characteristic of the request

For instance, a topic extraction model may have been trained with scientific corpus, medical articles or computer science papers. 

A simple approach is to pre-load all variants of a model when the underlying application is deployed in production. However, consuming uncessary memory and CPU cycle for a model that may be needed, at least right away, is a waste of resource. In this post, we assume that the model parameters are stored on Amazon S3.

 Lazy instantiation of objects allows us to reduce unnecessary memory consumption by invoking a constructor once, only when needed. This capability becomes critical for data with large footprint such as Apache Spark data sets.

A simple, efficient repository

Let's consider all the credentials to access multiple devices consisting of an id, password and hint that have been previously uploaded on S3. 

case class Credentials(device: String, id: String, password: String, hint: String)

A hash table is the simplest incarnation of a dynamic repository of models. Therefore we implement a lazy hash table by sub-classing the mutable HashMap.
The first time a model is requested, it is loaded into memory from S3 that returned to the client code. To this purpose we need to define the following argument for the constructor of the lazy hash table
  • Dynamic loading mechanism from S3 - loader is responsible for loading the data from S3
  • Key generator - toKey converts a string key to a the type of key of the Hash map
final class LazyHashMap[T, U](
    loader: String => Option[U], 
    toKey: String => T
) extends HashMap[T, U] {
   // Override the HashMap.get method 
   override def get(item: String): Option[U] = synchronized {
      val key = toKey(item)
      if(super.contains(key)) // Is is already in memory?
       loader(item).map(  // otherwise load the item from S3
         l => {
          super.put(key, l)
     // Prevent for updating this immutable map
   @throws(class = classOf[UnsupportedOperationException])
   override def put(key: T, value: U): Option[U] 
      throw new UnsupportedOperationException("lazy map is immutable")

The keyword synchronized implements a critical section to protect the execution from dirty read. 

Here is an example of the two arguments for the constructor of the lazy hash table for a type MyValue. The key identifies the data set and the model which has been trained on.

val load: String => Option[Credentials] = 
     (dataSource: String) => loadData(dataSource)
val key = (s: String) => s

val lazyHashMap = new LazyHashMap[String, Dataset[Credentials]](load, key)

The last business to take care of is the implementation of the function, loadData to load and instantiate the dataset

Data loader

Let's write a loader for a Spark data set of type T stored on AWS S3 in a given bucket, bucketName and folder, s3InputPath

def s3ToDataset[T](
   s3InputPath: String
)(implicit encoder: Encoder[T]): Dataset[T] = {
  import sparkSession.implicits._

   // Needed for access keys and infer schema
  val loadDS = Seq[T]().toDS
  val accessConfig = loadDS.sparkSession.sparkContext.hadoopConfiguration

   // Credentials to read from S3
accessConfig.set("fs.s3a.access.key", myAccessKey) accessConfig.set("fs.s3a.secret.key", mySecretKey) try {
     // Enforce the schema
     val inputSchema = loadDS.schema
	 .load(path = s"s3a://$bucketName/${s3InputPath}")
  catch {
    case e: FileNotFoundException => log.error(e.getMessage)
    case e: SparkException => log.error(e.getMessage)
    case e: IOException =>  log.error(e.getMessage)

It is assumed that the Apache Spark session has already been created and an encoder (i.e. Kryo) has been already been defined. The encoder for the type T is implicitly defined, usually along with the Spark session.
The first step is to instantiate a 'dummy' empty dataset of type T. The instantiation, loadDS is used to
  • Access the hadoop configuration to specify the credentials for S3
  • Enforce the schema when reading the data (in JSON) format from S3. Alternatively, the schema could have been inferred.
Note Data from S3 bucket is accessed through the s3a:// protocol. It add an object layer on top of the default S3 protocol which is block-centric. It is significantly faster.

Finally let's implement the load function, loadData

  // Create a simple Spark session
implicit val sparkSession =  SparkSession.builder

def loadData(s3Path: String): Option[Dataset[Credentials]] = {
  import sparkSession.implicits._ // need for encoding



  1. I would like to thank you for the efforts you have made in writing this article. I am hoping the same best work from you in the future as well. In fact your creative writing abilities has inspired me to start my own Blog Engine blog now. Really the blogging is spreading its wings rapidly. Your article is a fine example of it.
    Data science classes in pune

  2. It is normally convoluted to unmistakably characterize explicitly what the best web facilitating indeed is yet there are a couple of rules it is not difficult to stay with that can assist you with discovering what the best web facilitating is intended for your necessities. https://onohosting.com/

  3. casino - DRMCD
    The online gambling industry continues to grow 의정부 출장샵 rapidly despite 목포 출장마사지 the 강릉 출장샵 loss of The gaming industry continues to grow rapidly despite the loss of 부산광역 출장샵 Casino.com. casino-guide.eu. 영천 출장마사지

  4. This is a very nice one and gives in-depth information. I am really happy with the quality and presentation of the article. I’d really like to appreciate the efforts you get with writing this post. Thanks for sharing.
    Salesforce Training in Pune