In the previous post I discussed the problem of connecting to AWS instances concurrently and how slow it was when we implement it sequentially. We need to connect to hundreds of MySQL instances, fetch some records and store them in files. The records contained different URLs that needs to be accessed. I will just discuss the first part of accessing the records in MySQL and storing them in files.
Scala gives you the Actor model to implement concurrency. You create multiple actors and make them interact with each other by passing messages. Each actor instance can be looked upon as a thread that has some local data and behavior. You invoke the actor by passing messages. The messages are queued and executed one by one by the actor instance. So you can create hundreds of actor instances each doing a chunk of work. So if you have to connect to hundreds of database instances you define an Actor for connecting to the instance and executing the query and an actor for storing the records in separate files.
I used the Akka library to implement this. I used a RoundRobinRouter to create some 100 instances of the actors that will be used in a round robin fashion. In fact I used two Actors for storage purpose become of an intrinsic complexity involved, which is beyond the scope of this topic.
A sample code that connects to MySQL is shown here.
class DBActor(storageActor1:ActorRef, storageActor2:ActorRef) extends Actor{ def accessTable1(stmt:Statement):(String,String) = { //EXECUTE QUERY } def accessTable2(stmt:Statement):(String,String) = { //EXECUTE QUERY } def receive = { case credentials:DBCredentials => { try{ val url = "jdbc:mysql://" + credentials.endpoint + "/aws" val con = //create connection val stmt = con.createStatement() val data1 = accessTable1(stmt) val data2 = accessTable2(stmt) storageActor1 ! data1 storageActor2 ! data2 } catch{ case ex:Exception => { val dbErrorLevel1:DBErrorLevel1 = new DBErrorLevel1(credentials.fileName,ex.getMessage) storageActor1 ! dbErrorLevel1 } } } } }
An important aspect of the Actor model is you don’t have to worry about the synchronization nemesis in the Threading API. The DBActor connects to the MySQL instance and stores the records in different files using two other actors who do that job. And you can start the system using the RoundRobinRouter like this.
object MyApplication extends App{ val inputFolder = "./src/main/resources/Files" val system = ActorSystem("MyApplication") def getInfo(file:File):DBCredentials= { val lines = Source.fromFile(file).getLines.toList val credentials = new DBCredentials(file.getName,lines(0),lines(1),lines(2)) credentials } val storageActor1 = system.actorOf(Props[StorageActor1].withRouter(RoundRobinRouter(60)),name="StorageActors1") val storageActor1 = system.actorOf(Props[StorageActor2].withRouter(RoundRobinRouter(60)),name="StorageActors2") val router = system.actorOf(Props(new DBActor(storageActor1,storageActor2)).withRouter( RoundRobinRouter(60) ), name = "MyApplicationRouter") for(file <- new File(inputFolder).listFiles) { router ! file } }
So I create almost 60 actor instances using RoundRobinRouter and start the system. We then iterate through the files that contain the database endpoint information and pass each endpoint as a message to an actor instance. Needless to say I used SBT to build this application.
//build.sbt name := "MyApplication" version := "1.0" scalaVersion := "2.10.3" mainClass in Compile := Some("MyApplication") resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.2-M1" libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0" libraryDependencies += "com.typesafe.akka" % "akka-remote" % "2.0"