Monday, March 4, 2013

Scala's Share-nothing Actors

Introduction
Even with the introduction of executor service and java.util.concurrent high level of abstraction in Java 1.5, programmers have found quite difficult to build reliable multi-threaded applications that shared data and locks It is quite common for less experienced developers to either over-synchronize data access and create deadlocks or allow race conditions and transition the application to an inconsistent state.

Scala's actors is a share-nothing, message passing model. At its core, an actor is a 'thread' with a mailbox to receive and respond to messages. Actors are sub-classes of scala.actors.Actor. The two main methods create an actor are:
  • act: implements the co-routine that correspond to the execution of the thread, similar to Thread.run() in Java.

  • react: process the messages sent by other actors and queued in the mailbox. The method react does not return (non blocking) when receiving and processing a message or request. There are two approach to exit a processing of messages: call exit or call act again with an exit condition being true
Note: The implementation described in this post relies on Scala 2.9 and is not guarantee to compile and execute as expected in the future version of the language.

Example
The example below describes a master actor (managing task) that creates and manages slave actors (or worker tasks). In order to avoid race condition and adding a lock, the reference newParent to master actor is sent to each slave actor (line 10) through the message passing mechanism react (lines 9 - 13).
The slave Actor class implements a task for numIters executions of a specific process (line 15). The only way to exit the react loop is to call once again and exit on the condition parent != null (line 9). The computation method process to be executed by those slaves is an attribute of the slave (line 4).
Finally, the slave actor sends a message to its parent that its task is completed (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class SlaveActor(
  numIters: Int = 25000, 
  message: String,
  process: (Int) =>(Double)) extends Actor {
 
  def act {
    var parent: Actor = null
    while( parent == null) {
      react {
        case (msg: String, newParent: Actor) => 
           parent = newParent
        act()
      }
    }
    process(numIters)   

    parent ! "DONE"
  }
}

The Master task or actor is responsible to launches then control slave actors. Once a slave actor is completed, it notifies the master through a message 'DONE' (line 15). The master actor starts all the slaves (line 7) and sends a non-blocking message, Activate (line 8).
Upon receiving the message DONE (line 15), the master actor decrements the reference count of the slave actor currently active as soon as one completes its execution (line 16). The master ultimately exits when the last slave (reference counter is 0) exits (line 17).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MasterActor(
  slaveActors: List[SlaveActor]) extends Actor {
    
  def act() {
        
   for( slaveActor <- slaveActors) {
     slaveActor.start
     slaveActor ! ("Activate", this)
   }
        
   var refCounter = slaveActors.size-1
   loop  {
      react {       
    
        case "DONE" => {
          refCounter -= 1
          if(refCounter == 0) 
             exit
        }
        case _ =>  { println("Incorrect message") }
      }
   }
  }
}

The main routine, ActorsTest.main, creates the slaves which are launched by the master actor that acts as the managing task. The slaves execute a local function, waveSum, defined in real-time. This approach is an alternative to the most traditional functional futures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
object ActorsTest extends App {
   val nOfSlaves = 10
   val numIters = 1250000
   val eps = 0.0001
      
   // Arbitrary method to simulate load on the CPU cores
   def waveSum(numIters: Int): Double =
     (0 until numIters)./:(0.0)(
       (s,i)=> s+Math.exp(Math.sin(i*eps)
      ) 
         
      // Create the slave tasks, then ...
   val slaves = (0 until nOfSlaves)./:(List[SlaveActor]())(
    (xs, i)=> new SlaveActor(numIters, i.toString, waveSum) :: xs
   )
   new MasterActor(slaves).start
  }
}


References
Scala for the Impatient - C. Horstman - Addison-Wesley - 2012
Programming in Scala M. Odersky, L. Spoon, B. Venners - Artima 2010
https://github.com/prnicolas

No comments:

Post a Comment