I recently ran into an interesting problem to solve for my side project: how can I efficiently select the top k elements from a very large list in Java / Groovy?
There are many recommendations about how to do that, but I didn’t find a comparison of the suggested implementations. This is a crucial problem for my application so I tried out a number of them and here are the results. I chose Groovy for it’s syntactical sugar with collections and closures, but you can do the same in plain Java. The code is on github, you can run it with a simple mvn compile groovy:execute if you want to play around yourself.

Task: select the top 5 elements from a list of 10 million

Plain sort: 6,500ms

Sorting the whole list and picking the top elements is okay for small lists, but it doesn’t scale for larger lists as the time for sorting a list grows with O(nlogn). For a list with 10 million numbers, it’s completely out of the competition.

    topElements = unorderedList.sort()[numberCount-1..numberCount-k]

(QuickSelect: 2,500ms)

Also called Hoares Selection Algorithm, was mentioned in a few articles, so wanted to give it a try. However I only found implementations for educational purposes like this one. It’s probably not fair to compare this demo implementation with the highly optimized other ones that follow, so I put the result in brackets as I don’t want to discredit the algorithm.
[source too long to quote here, see article on brilliantsheep.com]

(Heap Select: 2,200ms)

Again, this is just a demonstration on how to implement a heap select based on Steve Hanov’s great article. I’ve ported his Python implementation to Groovy, which was easy enough. I’m sure there is a more efficient way to do a Heap Select – the old rule holds true: don’t try and implement it yourself if someone else brighter has done it already for you. Anyway, this was my naive attempt:

    def heapSelect(List list, k) {
        def heap = new PriorityQueue(k)
        list.each{ item ->
            if (heap.size() < k || item > heap.peek()) {
                if (heap.size() == k)
                    heap.remove(heap.peek())
                heap.offer(item)
            }
        }
        return heap as List
    }

PriorityQueue: 300ms

Ships with the JRE, so there’s no need for external libraries. It let’s you add a list of elements to a priority heap and then poll the top element from the heap one by one. Simple and fast!

    heap = new PriorityQueue(unorderedList.size())
    heap.addAll(unorderedList)
    topElements = (1..k).collect{heap.poll()}

Guava Ordering: 170ms

Google Guava (formerly Google Collections) comes with an Ordering class that works even faster than the PriorityQueue for our task. Under the hood it seems to use QuickSort to sort only parts of a collection, however I haven’t dug too deep in their implementation.
An obstacle on using this could be that your data has to be in a structure that implements Iterable, e.g. an ArrayList. If you have a plain int[] and need to convert it first, you might be better off to go for a PriorityQueue in the first place. On the other hand, maybe an ArrayIterator can do this very efficiently.. haven’t tried it out though.

com.google.common.collect.Ordering.natural()
    .greatestOf(arrayList, k)

By the way…

I often saw the suggestion to add the elements into a TreeMap which orders them for you. While this works fine it will never be the most efficient solution because it sorts all data in the map, whereas we’re only interested in the highest k items.

Please bear in mind that I put all this together at home on my own, so if you find any mistakes please leave a comment or drop me a mail and I’m happy to update this entry – and give you all the credit, of course ;)
Again: the code is on github – all it takes is mvn compile groovy:execute to run it on your machine.

Sometimes you have to write a method that’s not supposed to raise any exceptions. One such case was when I integrated dcramers django-paypal and implemented a listener for incoming payment notifications from Paypal. If my listener raises an exception, the payment notification will not be confirmed with Paypal and is therefor being retriggered by them. This is clearly not what I want to happen e.g. when our system can’t find the associated invoice, so I’m catching all exceptions and logging error messages which trigger emails to our support team.

    def paypal_ipn_parse(sender, **kwargs):
    try:
        #perform checks and mark invoice as paid etc
        ...
    except Exception as e:
        logger.error('caught exception while parsing paypal ipn: %s' % e)

There’s a couple of ‘exceptional situations’ that are being checked, e.g. ‘can the invoice be found’, ‘is the payment in the right amount and currency’ etc. But if the listener can’t raise an exception, how can we unit test this behaviour? The way I went for is to mock the logger instance from the module under test using Python Mock and assert that an expected string has been logged on error level. In order to have the flexibility to change the error message without breaking the test, I wrote a little helper called ‘SubstringMatcher’ which only checks for a given substring in the logged message:

from payment.utils import logger as utils_logger

class TestPaypalIpnHandling(TestCase):
    def setUp(self):
        utils_logger.error = Mock()

    def test_should_report_error_on_wrong_currency(self):
        ipn = PayPalIPN(invoice=self.test_invoice_uuid, mc_currency='non_existing_currency')
        paypal_ipn_parse(ipn)
        utils_logger.error.assert_called_with(
            SubstringMatcher(containing='wrong currency'))

from string import lower
class SubstringMatcher():
    def __init__(self, containing):
        self.containing = lower(containing)
    def __eq__(self, other):
        return lower(other).find(self.containing) > -1
    def __unicode__(self):
        return 'a string containing "%s"' % self.containing
    def __str__(self):
        return unicode(self).encode('utf-8')
    __repr__=__unicode__

Now our unit test ensures that the mocked method logger.error does log an error message containing the substring wrong currency. Otherwise the unit test error output will state:
AssertionError: Expected: ((a string containing “wrong currency”,), {})

Nov 252011

On my current Django project we just moved away from using system-wide installed dependencies to using virtualenv. The transition is pretty straight forward and there’s a couple of good how-tos like this one. Out of the many good reasons to use virtualenv rather than installing the dependencies system-wide, I found those are the most important:

  • projects with different dependencies are completely isolated from each other, no bad surprises if you install a new dependency for some other project
  • no bad surprises if your os package manager decides to update a dependency
  • dependencies can be defined within the project in a requirements file
  • ability to quickly switch between different versions of dependencies

If you use the great Eclipse plugin PyDev as your IDE, here’s some simple steps to configure your project to use the virtual environment – assuming you’ve set it up already:

  • In your virtual environment there’s a python wrapper that we want to use in Eclipse. Go to Window -> Preferences -> Interpreter Python and create a new interpreter. Select the python wrapper from the virtual environment ($WORKON_HOME/YOUR_PROJECT/bin/python). The libraries should be selected automatically – leave them as they are.
  • In the project properties change the interpreter to use the newly created one (Project -> Properties -> PyDev Python Interpreter)
  • Alternatively you can manage the environment individually for each run configuration in the ‘Run -> Run configurations -> Interpreter’ tab

This worked for me with Eclipse Helios SR2 and PyDev 1.6.5. To test it you could uninstall one of your dependencies from your system and install it only into that virtual environment. Some credit for the steps described goes to Luke Plant, however I tried to simplify it a bit.

In my day job we recently finished a performance testing session and had some interesting insights that I’d like to share here. To set the scope for this article I’m using Grig Gheorghiu’s definition of load vs performance vs stress testing. Basically performance testing is done to make sure you achieve the performance figures that business wants – e.g. a throughput of 10k per minute for a certain entity. In the process you will find out what your biggest bottlenecks are and hopefully eliminate them to finally meet or overachieve your requirements.

Setup

Our application is picking up events from a database table, fetches some data from other tables and sends out a soap web service request to a third party. We use the java executor framework to get a threadpool for these event workers and played with the threadpool size to find out how our application scales up. To get the timings of the individual components (and thereby identify bottlenecks) we used JETM. You can plug it into your application and see invocations, average execution etc. in a nice web console. To focus the test on our own application we set up a little mock web service that emulates the third party service.

Technology stack

  • JVM 1.6.0
  • Oracle 10g
  • JPA with Hibernate as persistence provider
  • JBoss 4.2.3-GA
  • jbossws-metro-3.1.0.GA
    • JAX-WS RI 2.1.4
    • JAXB RI 2.1.9
    • WSIT 1.3.1

First test iterations: fun with JaxWS

In the first run it quickly became obvious that we had a major bottleneck that made it impossible for us to achieve the required throughput. Using VisualVM it didn’t take long to identify the initialisation of the JaxWS service port as the culprit. That class basically just holds some meta information about the service in order to call it correctly (e.g. security policies). Turns out it’s a really bad idea to create a new instance for every request, as JaxWS will go out and fetch the WSDL from the remote service every single time.
The obvious solution is to cache the instance and share it among the event worker threads. However old versions of JaxWS were not thread safe and it is still in the dark when and where this was fixed. To be on the safe side we performed some massive loads and checked that all generated messages were schema compliant.

Cleaning up our own backyard: JPA query optimisation

With this major bottleneck out of the way we were able to find some database performance problems in our own application code. This is the sort of bottleneck we expected, especially as we use JPA which abstracts the database away and thereby it is easy to oversee bad-performing database queries. Yes, you should have a look at the actual queries in the log and check if that’s what you actually want to run. Often there are too many joins resulting in data being fetch unnecessary, or not enough joins resulting in more queries than necessary, to only name some common problems.
But analysing the queries from the log only gets you so far. Database statistics are your friend if you want to find out how the database behaves under high load, identify the slowest performing queries and those with the most CPU time overall. They help you focus the optimisation efforts on the important parts: those 2% of your code that make all the difference.
This way we found a couple of slow performers that constituted the bottlenecks. Once identified and with the help of the database statistics and some explain plans it was straightforward to resolve them. These are the most noteworthy changes we made:

  • make sure they use an index
  • configure cascade operations for more efficient updates and merges

Why synchronised parts matter

With these improvements in place we played with the threadpool size again and now captured the following execution times:

Throughput (events/minute)

Average client execution time

As you can see the client doesn’t scale any higher when using more than 8 threads. If you throw more threads at it, the average invocation time increases, resulting in no higher throughput. Also note that the WS-Security overhead does have a huge impact in the complete turnaround time (as we expected), but at least it doesn’t make the client scale even worse ;)

In an ideal world (without synchronisation), scaling up an application is easy: just increase the number of concurrent workers and give them enough resources. If you double the amount of workers, the average execution time will remain pretty stable, resulting in roughly the double throughput for double number of concurrent workers. This is why frameworks like Node.js are becoming so popular these days: they scale up perfectly because they don’t have any synchronised parts.

Scaling up obviously only works as long as there are still enough resources available, therefore we monitored the servers with VisualVM. The graphs show that the CPU usage is constantly around 60%, no matter if 8 or 16 workers are active. The additional threads can’t make more use of the available resources because they obviously wait outside some synchronised blocks. With more workers the graph is only more volatile which results in slightly worse performance because the garbage collector is running more often.

Threadpool size: 1

Threadpool size: 2

Threadpool size: 4

Threadpool size: 8

Threadpool size: 16

Making sure we blame the right one

While the figures seem to indicate that the JaxWS RI client doesn’t scale up well, the bad performing part could aswell be the service – especially as four servers with multiple concurrent threads are hammering at only one machine that hosts the service. So we set up a second machine with the same mock service and got the same results.
It appears that the JaxWS RI service side of things are pretty much optimised – at least some effort has gone in there. But as our results show, the client part needs attention. Maybe the problem is in some library that’s used under the hood and maybe it has been improved in a later version, but I haven’t found anything in that direction.

Can we increase the throughput anyway?

To scale up we would have to resolve our bottlenecks. So we could use a different web services framework that scales up better (if that exists). But changing one main component in the architecture could have a lot of implications – luckily there is a second option:
Scale out (scale horizontally), that is to come up with more servers for the same job. But the architecture must cope with that scenario.
In our case all servers fetch events from the same database table. So we have to ensure that any two servers will never work on the same event, essentially by using the database table like a queue. To achive that every worker instance tries to lock a number of events and processes them only if the lock was acquired – if not, it tries to lock the next batch of events.

Conclusion

Our performance testing session was a great excercise that helped in resolving bottlenecks, thereby increasing the throughput of our application and finally overachieving the requirements we were given. While that’s great it is a bit disappointing that we can’t scale up any higher because we happen to use JaxWS RI. On a more positive note, there are two strategies to increase the throughput if performance requirements are growing in the future: scale out or move away from JaxWS RI.
In my eyes these results are also another hint that besides the long lasting hype for soap web services in the past, implementations are still not as mature as general use HTTP/REST clients. Those are much longer around, more widely used and tested than soap frameworks. Therefore chances are higher that you will find one that scales up good enough for your needs.