Michael Pollmeier

scala.util.Try is pretty awesome and you should read Daniel’s introductory post if you don’t know it yet. I just ported some code from using exception handling to using Try: lines of code went down to a third, readability improved heavily.

If you need to handle all success and error cases you could chain Try … recover, but that get’s a bit clunky if you’ve got more than two Trys, because you have to indent it. Another option is to chain orElse:

def trySomething = Try { throw new Exception("didn't work"); -1 }
def trySomethingElse = Try { 42 }

trySomething orElse trySomethingElse map {
  case 42 => //wow, it worked
  case -1 => //no, this will never happen
} recover {
  case e: Throwable => // in case both failed
}

Warning: do not try this at work! ;)

We were hosting a code retreat at Movio yesterday, and one of the sessions had the constraint that none of your production methods may return anything. I checked with our facilitator Richard and he said that callbacks are allowed.
So Mark and I decided that each method that would normally return a value, would instead take call a callback function with that value. We would hand in that callback into the method.
So we built a callback lifter that we would hand a function, which return value we’re interested in. It calls the function and returns the value provided in the callback. With this in place we can use any function as if it would return a value ;)

Here it is. Quite concise actually…

def callbackLifter[T](func: ((T => Any)) => Unit): T = {
  var retVal = None: Option[T]
  func((x: T) => retVal = Some(x))
  retVal.get
}

And here’s how to use is

def returnValueViaCallback(cb: (Int) => Any) { cb(3) }
callbackLifter(returnValueViaCallback) // 3

It also works with functions that take parameters

def withArgs(arg: Int, cb: (Int) => Any) { cb(arg) }
val partial = withArgs(99, _: (Int) => Any)
callbackLifter(partial) //99

We didn’t actually get very far in the kata, but this was big fun!
Let me repeat my warning: this was just a fun exercise to see what’s possible under funny constraints. Code retreats exist to drive some ideas to the extreme, which we did. ;)

In an adrenaline rush of good coding practices (I hope you read the sarcasm) we decided that mutable state in the callbackLifter (see the var?) is really bad. Exception driven development to the rescue! ;)

def exceptionDrivenCallbackLifter[T](func: ((T => Any)) => Unit): T =
  try {
    func((x: T) => throw new CallBackException[T](x))
    null.asInstanceOf[T] //the compiler forces us to return a T, even if the line below always throws an exception
  } catch {
    case CallBackException(x: T) => x
  }

Here’s an addition to Akka TestKit that might come in handy. It does what it says: you can give it a couple of messages that you’re expecting within a given duration (defaults to 3s). Unlike expectMsgAllOf from the TestKit it ignores other messages that get sent to the testActor (expectMsgAllOf fails the test in that case).

  import scala.collection.mutable
  def expectMsgAllOfIgnoreOthers[T](max: Duration, expected: T*) {
    val outstanding = mutable.Set(expected: _*)
    fishForMessage(max) {
      case msg: T if outstanding.contains(msg) =>
        outstanding.remove(msg)
        outstanding.isEmpty
      case _ => false
    }
  }

  def expectMsgAllOfIgnoreOthers[T](expected: T*) {
    expectMsgAllOfIgnoreOthers(3 seconds, expected: _*)
  }

At Movio we’re using Akka in more and more places – we love the Actor model for concurrency. This week we ran into a problem that I think more projects will face: we’ve got an Actor that produces work very quickly (i.e. it just pulls records from the database which is really fast) and sends that work as individual messages to some (routed) Actors who do some long-time processing on those. Our problem was that the producer is creating work much faster than all the worker Actors together will be able to work off. Those messages would all be waiting in the worker’s mailboxes, eventually eating up the complete heap space. I discussed this on the Akka usergroup and quickly got some good responses.

The solution we came up with is a simpler version of Derek Wyatt’s Balancing Workload Across Nodes with Akka that’s based on acknowledgements. We defined a master Actor who has access to some work – we called it Epic which is simply an Iterable[Work]. Worker Actors can run anywhere in your cluster and register with the master at any time. When the master get’s a new Epic it informs all workers that there’s work available. Those then pull piece by piece from the master until there’s no more work left. In the end the master simply doesn’t respond with work any more and the workers stop asking.

Your concrete implementation of this just needs to take care of the actual implementation of the worker (what does it actually do with that piece of work?), the Epic (how does work get created?) and the supervisor strategy of the Actors (in our case we made the workers restart and the master resume, as we don’t want it to loose the Epic it’s currently working on. Our Epic just pulls a batch of work from the database and queues it in a local queue. Once that queue is empty it refills it until there’s no more work left.

This fits nicely with the Actor model – nothing is blocking, you can distribute your workers and don’t need to worry about mailbox sizes. And here’s the important parts of the implementation. Our github repo contains the full implementation and the test suite. Simply clone it and run `sbt test`.

object WorkPullingPattern {
  sealed trait Message
  trait Epic[T] extends Iterable[T] //used by master to create work (in a streaming way)
  case object GimmeWork extends Message
  case object CurrentlyBusy extends Message
  case object WorkAvailable extends Message
  case class RegisterWorker(worker: ActorRef) extends Message
  case class Work[T](work: T) extends Message
}

class Master[T] extends Actor {
  val log = LoggerFactory.getLogger(getClass)
  val workers = mutable.Set.empty[ActorRef]
  var currentEpic: Option[Epic[T]] = None

  def receive = {
    case epic: Epic[T] ?
      if (currentEpic.isDefined)
        sender ! CurrentlyBusy
      else if (workers.isEmpty)
        log.error("Got work but there are no workers registered.")
      else {
        currentEpic = Some(epic)
        workers foreach { _ ! WorkAvailable }
      }

    case RegisterWorker(worker) ?
      log.info(s"worker $worker registered")
      context.watch(worker)
      workers += worker

    case Terminated(worker) ?
      log.info(s"worker $worker died - taking off the set of workers")
      workers.remove(worker)

    case GimmeWork ? currentEpic match {
      case None ?
        log.info("workers asked for work but we've no more work to do")
      case Some(epic) ?
        val iter = epic.iterator
        if (iter.hasNext)
          sender ! Work(iter.next)
        else {
          log.info(s"done with current epic $epic")
          currentEpic = None
        }
    }
  }
}

abstract class Worker[T](val master: ActorRef) extends Actor {

  override def preStart {
    master ! RegisterWorker(self)
    master ! GimmeWork // keep working on actor restart
  }

  def receive = {
    case WorkAvailable ?
      master ! GimmeWork
    case Work(work: T) ?
      doWork(work)
      master ! GimmeWork
  }

  def doWork(work: T)
}

For future reference here’s some alternative ideas – they all have caveats for the problem we faced, but might be considerable in other situations:

  1. Have thousands of worker actors: doesn’t work for us because they depend on a database which is our actual bottleneck
  2. Use a bounded mailbox size for the worker actors. That blocks the producing Actor when sending even more messages to the workers. Sounds like what we needed, however it doesn’t work with remote Actors: instead of blocking on a full mailbox it sends the message to the Deadletter Queue
  3. Use Derek Wyatt’s PressureQueue. It’s basically a custom mailbox for the worker Actors that delays the submission of new messages based on the mailbox size. I’m not convinced that it fits the Actor model, partly because it’s blocking in producer Actor, which means that it can’t react on other messages any more. Also it doesn’t seem to be used widely and the last commit is 10 months ago.
  4. The producer could only pull the Ids from the database and we hope that those fit into memory – i.e. we’d have millions of IDs as messages floating around. The workers then fetch the complete record later on and slowly get the job done. This works as long as all the Ids fit into memory – beyond that point your JVM explodes ;)
  5. Use the TimerBasedThrottler which makes our producer only create X amount of work per time unit. The problem here is how do I get the X and the time unit? It’s only ever going to be a rough guess, so I’m either missing out on performance (if my workers could do faster) or potentially running out of memory (if my workers can’t catch up, e.g. because of other load on the system)

If you ever need to create an interactive console/REPL that let’s users use play with your code in a simple terminal you might (still) find that there’s hardly any documentation. I fixed the console for Gremlin-Scala (a graph DSL) and thought that other people might want a very basic version of a custom shell to have a quick start.
It doesn’t do much more than importing java.lang.Math so that you can type things like `abs(-4.2)` directly in the shell without having to import java.lang.Math._ first. The important part is that your import statements and any initialisation belongs into `addThunk` – those are executed after the shell is fully initialised – otherwise you get all sorts of AST errors that don’t really help much.

This is obviously only a starting point, you can further customise it to automatically do stuff with the executed command like we do in Gremlin-Scala’s Console.

import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.ILoop

object TestConsole extends App {
  val settings = new Settings
  settings.usejavacp.value = true
  settings.deprecation.value = true

  new SampleILoop().process(settings)
}

class SampleILoop extends ILoop {
  override def prompt = "==> "

  addThunk {
    intp.beQuietDuring {
      intp.addImports("java.lang.Math._")
    }
  }

  override def printWelcome() {
    echo("\n" +
      "         \\,,,/\n" +
      "         (o o)\n" +
      "-----oOOo-(_)-oOOo-----")
  }

}

Scala 2.10 introduced a new reflection API (see here for an in-depth overview) which is much closer to the actual type system than the old Manifests were – in fact the compiler uses TypeTags itself. And it’s not hard to use as we’ll see. One reason why Scala provides reflection additional to Java’s reflection API is that the JVM erases generic types. As a motivation let’s see how generic types get erased at runtime:

  case class A[T]
  println(A[String].isInstanceOf[A[String]]) // true
  println(A[String].isInstanceOf[A[Int]])
  // true aswell - doesn't sounds right, no? that's because the type of T gets erased at runtime...

Scala 2.10 introduces TypeTag which brings us the compile-time information about the generic type T to the runtime:

  import scala.reflect.runtime.universe._
    case class B[T: TypeTag] {
      val tpe = typeOf[T]
  }
  println(B[String].tpe == typeOf[String]) // true
  println(B[String].tpe == typeOf[Int])    // false

  //and this also works for nested parameterised types:
  println(B[List[String]].tpe == typeOf[List[String]]) //true
  println(B[List[String]].tpe == typeOf[List[Int]])    //false

Another use case for the reflection API we just had at Movio is to get all members of an object that are of a given type. We define a customer specific configuration in objects that drives our whole system – from the Swagger API documentation all the way down to how we persist the data in Cassandra and do our filters. The code below grabs all members of an object that are subtypes of Field, gets their generic type T and checks if they mix in a given trait `Required`. Scala’s runtime mirrors allow us to lookup the symbols and types for a given instance at runtime – pretty neat!

  import scala.reflect.runtime.universe._
  case class Field[T: TypeTag] {
    val tpe = typeOf[T]
  }
  trait Required //can be mixed into Field to mark it as required

  object SomeClass extends BaseClass {
    val stringField = Field[String]
    val requiredIntField = new Field[Int] with Required
  }

  abstract class BaseClass {
    val typeMirror = runtimeMirror(this.getClass.getClassLoader)
    val instanceMirror = typeMirror.reflect(this)
    val members = instanceMirror.symbol.typeSignature.members
    def fieldMirror(symbol: Symbol) = instanceMirror.reflectField(symbol.asTerm)

    def fields = members.filter(_.typeSignature <:< typeOf[Field[_]])
    // filters all members that conform to type Field[_], i.e. also subclasses of Field
  }

  SomeClass.fields.foreach { symbol =>
    val name = symbol.name.toString.trim
    val required = symbol.typeSignature <:< typeOf[Required]
    val tpe = SomeClass.fieldMirror(symbol).get match {
      case field: Field[_] => field.tpe
    }
    println(s"field: $name; type=$tpe; required: $required")
    /** prints:
     *  field: requiredIntField; type=Int; required: true
     *  field: stringField; type=String; required: false
     */
  }

Credit goes to my colleague Felix Geller who discovered most of this first!

May 292012

While it’s generally a good idea to use signed libraries, an enforced sign process in the project definition may get into your way if you fix some open source project. As you don’t have the GPG Passphrase you won’t be able to install/deploy the artifact into your local/enterprise repository:

--- maven-gpg-plugin:1.4:sign (sign-artifacts) @ project-abc ---
GPG Passphrase: *
gpg: no default secret key: secret key not available
gpg: signing failed: secret key not available

To get around that you could comment out the maven-gpg-plugin in your pom. But if that’s defined some parent pom you don’t have that option. In that case you can simply disable it in your pom’s project -> build -> plugins section:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-gpg-plugin</artifactId>
    <configuration>
        <skip>true</skip>
    </configuration>
</plugin>

I recently ran into an interesting problem to solve for my side project: how can I efficiently select the top k elements from a very large list in Java / Groovy?
There are many recommendations about how to do that, but I didn’t find a comparison of the suggested implementations. This is a crucial problem for my application so I tried out a number of them and here are the results. I chose Groovy for it’s syntactical sugar with collections and closures, but you can do the same in plain Java. The code is on github, you can run it with a simple mvn compile groovy:execute if you want to play around yourself.

Task: select the top 5 elements from a list of 10 million

Plain sort: 6,500ms

Sorting the whole list and picking the top elements is okay for small lists, but it doesn’t scale for larger lists as the time for sorting a list grows with O(nlogn). For a list with 10 million numbers, it’s completely out of the competition.

    topElements = unorderedList.sort()[numberCount-1..numberCount-k]

(QuickSelect: 2,500ms)

Also called Hoares Selection Algorithm, was mentioned in a few articles, so wanted to give it a try. However I only found implementations for educational purposes like this one. It’s probably not fair to compare this demo implementation with the highly optimized other ones that follow, so I put the result in brackets as I don’t want to discredit the algorithm.
[source too long to quote here, see article on brilliantsheep.com]

(Heap Select: 2,200ms)

Again, this is just a demonstration on how to implement a heap select based on Steve Hanov’s great article. I’ve ported his Python implementation to Groovy, which was easy enough. I’m sure there is a more efficient way to do a Heap Select – the old rule holds true: don’t try and implement it yourself if someone else brighter has done it already for you. Anyway, this was my naive attempt:

    def heapSelect(List list, k) {
        def heap = new PriorityQueue(k)
        list.each{ item ->
            if (heap.size() < k || item > heap.peek()) {
                if (heap.size() == k)
                    heap.remove(heap.peek())
                heap.offer(item)
            }
        }
        return heap as List
    }

PriorityQueue: 300ms

Ships with the JRE, so there’s no need for external libraries. It let’s you add a list of elements to a priority heap and then poll the top element from the heap one by one. Simple and fast!

    heap = new PriorityQueue(unorderedList.size())
    heap.addAll(unorderedList)
    topElements = (1..k).collect{heap.poll()}

Guava Ordering: 170ms

Google Guava (formerly Google Collections) comes with an Ordering class that works even faster than the PriorityQueue for our task. Under the hood it seems to use QuickSort to sort only parts of a collection, however I haven’t dug too deep in their implementation.
An obstacle on using this could be that your data has to be in a structure that implements Iterable, e.g. an ArrayList. If you have a plain int[] and need to convert it first, you might be better off to go for a PriorityQueue in the first place. On the other hand, maybe an ArrayIterator can do this very efficiently.. haven’t tried it out though.

com.google.common.collect.Ordering.natural()
    .greatestOf(arrayList, k)

By the way…

I often saw the suggestion to add the elements into a TreeMap which orders them for you. While this works fine it will never be the most efficient solution because it sorts all data in the map, whereas we’re only interested in the highest k items.

Please bear in mind that I put all this together at home on my own, so if you find any mistakes please leave a comment or drop me a mail and I’m happy to update this entry – and give you all the credit, of course ;)
Again: the code is on github – all it takes is mvn compile groovy:execute to run it on your machine.

Sometimes you have to write a method that’s not supposed to raise any exceptions. One such case was when I integrated dcramers django-paypal and implemented a listener for incoming payment notifications from Paypal. If my listener raises an exception, the payment notification will not be confirmed with Paypal and is therefor being retriggered by them. This is clearly not what I want to happen e.g. when our system can’t find the associated invoice, so I’m catching all exceptions and logging error messages which trigger emails to our support team.

    def paypal_ipn_parse(sender, **kwargs):
    try:
        #perform checks and mark invoice as paid etc
        ...
    except Exception as e:
        logger.error('caught exception while parsing paypal ipn: %s' % e)

There’s a couple of ‘exceptional situations’ that are being checked, e.g. ‘can the invoice be found’, ‘is the payment in the right amount and currency’ etc. But if the listener can’t raise an exception, how can we unit test this behaviour? The way I went for is to mock the logger instance from the module under test using Python Mock and assert that an expected string has been logged on error level. In order to have the flexibility to change the error message without breaking the test, I wrote a little helper called ‘SubstringMatcher’ which only checks for a given substring in the logged message:

from payment.utils import logger as utils_logger

class TestPaypalIpnHandling(TestCase):
    def setUp(self):
        utils_logger.error = Mock()

    def test_should_report_error_on_wrong_currency(self):
        ipn = PayPalIPN(invoice=self.test_invoice_uuid, mc_currency='non_existing_currency')
        paypal_ipn_parse(ipn)
        utils_logger.error.assert_called_with(
            SubstringMatcher(containing='wrong currency'))

from string import lower
class SubstringMatcher():
    def __init__(self, containing):
        self.containing = lower(containing)
    def __eq__(self, other):
        return lower(other).find(self.containing) > -1
    def __unicode__(self):
        return 'a string containing "%s"' % self.containing
    def __str__(self):
        return unicode(self).encode('utf-8')
    __repr__=__unicode__

Now our unit test ensures that the mocked method logger.error does log an error message containing the substring wrong currency. Otherwise the unit test error output will state:
AssertionError: Expected: ((a string containing “wrong currency”,), {})

Nov 252011

On my current Django project we just moved away from using system-wide installed dependencies to using virtualenv. The transition is pretty straight forward and there’s a couple of good how-tos like this one. Out of the many good reasons to use virtualenv rather than installing the dependencies system-wide, I found those are the most important:

  • projects with different dependencies are completely isolated from each other, no bad surprises if you install a new dependency for some other project
  • no bad surprises if your os package manager decides to update a dependency
  • dependencies can be defined within the project in a requirements file
  • ability to quickly switch between different versions of dependencies

If you use the great Eclipse plugin PyDev as your IDE, here’s some simple steps to configure your project to use the virtual environment – assuming you’ve set it up already:

  • In your virtual environment there’s a python wrapper that we want to use in Eclipse. Go to Window -> Preferences -> Interpreter Python and create a new interpreter. Select the python wrapper from the virtual environment ($WORKON_HOME/YOUR_PROJECT/bin/python). The libraries should be selected automatically – leave them as they are.
  • In the project properties change the interpreter to use the newly created one (Project -> Properties -> PyDev Python Interpreter)
  • Alternatively you can manage the environment individually for each run configuration in the ‘Run -> Run configurations -> Interpreter’ tab

This worked for me with Eclipse Helios SR2 and PyDev 1.6.5. To test it you could uninstall one of your dependencies from your system and install it only into that virtual environment. Some credit for the steps described goes to Luke Plant, however I tried to simplify it a bit.