четверг, 29 марта 2012 г.

ordered high throughput message monitor

Consider we have some high-throughput message listener, say processing more than 1000 messages per sec and there is a task to add some checks on each message like latency or correct ordering checks.

The task is quite common in remote messaging systems with failover functionality: e.g. you can miss a few messages when your messaging system is failovered or get stale ones due to some network issues or bugs in your messaging system itself.

Latency checks are OK - just compare operation between 2 timestamps eventually.

The problem is with ordering checks as surely we'll have some race-conditions here but using locks/synchronization will most likely kill performance and scalability.

So we need some kind of non-blocking algorithm - sure CAS atomics will help us.

Let's specify the requirements, we need:
-track for missed messages (when get forward ones)
-ignore the stale messages (when get old)
No more words, let's just code it:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
    private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
    
    private final AtomicLong messageCounter = new AtomicLong(0);

    @Override
    public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
        final long expected = messageCounter.getAndSet(message.getCounter()) + 1;

        if (expected == message.getCounter()) {
            processBusinessStuff(message);
            return true;
        } else if (expected > message.getCounter()) { // stale message
            /* wrong message, attempt to restore the sequence to prevent an error on next good message
             * TODO: fix ABA problem here
             */
            messageCounter.compareAndSet(message.getCounter(), expected - 1);
                    
            log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
                    message.getCounter(), expected));

            // some other notifying stuff...

        } else if (expected < message.getCounter()) { // missed messages
            log.error(String.format("got forward message %s while expected %s, missed: %s",
                    message.getCounter(), expected, message.getCounter() - expected));

            // some other notifying stuff...

        }
        return false;
    }

    private void processBusinessStuff(YetAnotherMessage message) {
        log.info(String.format("process message %s", message.getCounter()));
        // some business logic...
    }
}

so we have an abstract message with Counter field for order, use CAS to track it and notify when there are missed or stale messages.

Looks good and works for 99.99% time, but can fail sometimes with mess in ordering due to ABA problem in 17 line. Consider message sequence 1,2,3,4,2 and we updated messageCounter to 4 then to2 and going to execute 17 line in one thread (A).
At the same moment 2 more messages arrived: 5 (correct) and 2 (wrong again) and another threads updates the messageCounter to 5 with missed message notification (first false notification) and then to 2, and only after that the thread A executes 17 line and as it successfully finds messageCounter=2 it atomically updates it back to 4. Thus when next correct message 6 comes - we say that message 5 was missed (second false notification) while we already processed it successfully.

Let's fix it, there is a cool AtomicStampedReference class in jdk against ABA issues, but unfortunately it hasn't got an getAndSet method (
We have to implement it ourself it's not very complex by analogue with AtomicLong:
private StampedReferencePairPub getAndSetMessageCounter(final long newValue) {
        while (true) {
            StampedReferencePairPub current = new StampedReferencePairPub(messageCounter.getReference(), messageCounter.getStamp());
            if (messageCounter.compareAndSet(current.ref, newValue, current.stamp, current.stamp + 1))
                return current;
        }
    }

...

public static class StampedReferencePairPub {
        public final long ref;
        public final int stamp;
        
        StampedReferencePairPub(long r, int i) {
            ref = r; stamp = i;
        }
    }

- yes we have to re-declare the same class as ReferenceIntegerPair as it's inner private inside AtomicStampedReference.

+some refactoring stuff and final version is:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
    private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
    
    private final AtomicStampedReference messageCounter = new AtomicStampedReference((long)0, 0);


    @Override
    public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
        final StampedReferencePairPub current = getAndSetMessageCounter(message.getCounter());
        final long expectedCounter = current.ref + 1;

        if (expectedCounter == message.getCounter()) {
            processBusinessStuff(message);
            return true;
        } else if (expectedCounter > message.getCounter()) {
            /* ignore stale message: attempt to restore the sequence to prevent an error on next good message
             */
            final int expectedStamp = current.stamp + 1;
            boolean restored = messageCounter.compareAndSet(message.getCounter(), current.ref, expectedStamp, expectedStamp + 1);
                    
            log.error(String.format("messaging system ordering bug: got stale message %s while expected %s! Sequence restored: %s",
                    message.getCounter(), expectedCounter, restored));

            // some other notifying stuff...

        } else if (expectedCounter < message.getCounter()) {
            log.error(String.format("got forward message %s while expected %s, probably missed: %s",
                    message.getCounter(), expectedCounter, message.getCounter() - expectedCounter));

            // some other notifying stuff...

        }
        return false;
    }

That's it! Now we can have only one false missed message notification after any stale message in case of bad timings and high concurrency

Quite a vivid example on how simple things get complicated in non-blocking code, consider how complex a fully correct algorithm will be! =)

+Full sources with jnit tests, +SO question

Комментариев нет:

Отправить комментарий