I’m working on a project where I need to multiplex many requests through a finite set of open sockets. For example, I have 200 messages, but I can only have at max 10 sockets open. To accomplish this I’ve wrapped the sockets in akka actors and am using an akka routing mechanism to “share” the 10 open sockets through a roundrobin queue.

This works out great, since now the consumers (who are rabbit mq listeners) just post messages to a facacde on the resource, and akka will route the request and do the appropriate work for me.

However, I wanted to know of a clean way to be able to add more resources (or remove them). Say at runtime I am asked to add 10 more open connections, or that suddenly we need to scale down to 5 connections. I’d like the router to be able to manage that for me.

It took a little poking around, but its not that complicated to do. The router manages a list of routees and you can pick a random one you want to remove (or add new ones). To remove one, send it a poison pill, and have the context unwatch it so the supervisor stops caring if it fails or not. Then tell the router to stop routing messages to it. When the poison pill reaches the actor (it’ll finish processing its messages first) then it’ll stop itself and you can do cleanup. In my case this is where I’d close the open socket.

A full scala example is here:



import akka.actor.\_  
import akka.routing.\_

case class Add()

case class Remove()

class Worker(id: Integer) extends UntypedActor {  
 println(s"Made worker $id")

@throws[Exception](classOf[Exception]) override  
 def preStart(): Unit = {  
 println(s"Starting $id")  
 }

@throws[Exception](classOf[Exception]) override  
 def postStop(): Unit = {  
 println(s"Stopping $id")  
 }

@throws[Exception](classOf[Exception])  
 override def onReceive(message: Any): Unit = message match {  
 case \_ =\> println(s"Message received on actor $id")  
 }  
}

class Master extends Actor {

var count = 0

def makeWorker() = {  
 val id = count

count = count + 1

context.actorOf(Props(new Worker(id)))  
 }

var router = {  
 val startingRouteeNumber = 2

val initialRoutees = Seq.fill(startingRouteeNumber) {  
 val worker = makeWorker()  
 context watch worker  
 ActorRefRoutee(worker)  
 }

Router(RoundRobinRoutingLogic(), initialRoutees.toIndexedSeq)  
 }

def receive = {  
 case Remove =\>  
 println("Removing route")

val head = router.routees.head.asInstanceOf[ActorRefRoutee].ref

head ! PoisonPill

context unwatch head

router = router.removeRoutee(head)

printRoutes()

case Add =\>  
 println("Adding route")

val worker = makeWorker()

context watch worker

router = router.addRoutee(worker)

printRoutes()

case w: AnyRef =\>

printRoutes()

router.route(w, sender())  
 }

def printRoutes(): Unit = {  
 val size = router.routees.size

println(s"Total routes $size")  
 }  
}

object Main extends App {  
 var system = ActorSystem.create("foo")

var master = system.actorOf(Props[Master])

master ! "do work"

master ! Remove

master ! "do more work"

master ! "do even more work"

master ! Add

master ! "do work again"  
}