The task is quite common in remote messaging systems with failover functionality: e.g. you can miss a few messages when your messaging system is failovered or get stale ones due to some network issues or bugs in your messaging system itself.
Latency checks are OK - just compare operation between 2 timestamps eventually.
The problem is with ordering checks as surely we'll have some race-conditions here but using locks/synchronization will most likely kill performance and scalability.
So we need some kind of non-blocking algorithm - sure CAS atomics will help us.
Let's specify the requirements, we need:
-track for missed messages (when get forward ones)
-ignore the stale messages (when get old)
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler { private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class); private final AtomicLong messageCounter = new AtomicLong(0); @Override public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) { final long expected = messageCounter.getAndSet(message.getCounter()) + 1; if (expected == message.getCounter()) { processBusinessStuff(message); return true; } else if (expected > message.getCounter()) { // stale message /* wrong message, attempt to restore the sequence to prevent an error on next good message * TODO: fix ABA problem here */ messageCounter.compareAndSet(message.getCounter(), expected - 1); log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!", message.getCounter(), expected)); // some other notifying stuff... } else if (expected < message.getCounter()) { // missed messages log.error(String.format("got forward message %s while expected %s, missed: %s", message.getCounter(), expected, message.getCounter() - expected)); // some other notifying stuff... } return false; } private void processBusinessStuff(YetAnotherMessage message) { log.info(String.format("process message %s", message.getCounter())); // some business logic... } }
so we have an abstract message with Counter field for order, use CAS to track it and notify when there are missed or stale messages.
Looks good and works for 99.99% time, but can fail sometimes with mess in ordering due to ABA problem in 17 line. Consider message sequence 1,2,3,4,2 and we updated messageCounter to 4 then to2 and going to execute 17 line in one thread (A).
At the same moment 2 more messages arrived: 5 (correct) and 2 (wrong again) and another threads updates the messageCounter to 5 with missed message notification (first false notification) and then to 2, and only after that the thread A executes 17 line and as it successfully finds messageCounter=2 it atomically updates it back to 4. Thus when next correct message 6 comes - we say that message 5 was missed (second false notification) while we already processed it successfully.
Let's fix it, there is a cool AtomicStampedReference class in jdk against ABA issues, but unfortunately it hasn't got an getAndSet method (
We have to implement it ourself it's not very complex by analogue with AtomicLong:
private StampedReferencePairPub getAndSetMessageCounter(final long newValue) { while (true) { StampedReferencePairPub current = new StampedReferencePairPub(messageCounter.getReference(), messageCounter.getStamp()); if (messageCounter.compareAndSet(current.ref, newValue, current.stamp, current.stamp + 1)) return current; } } ... public static class StampedReferencePairPub { public final long ref; public final int stamp; StampedReferencePairPub(long r, int i) { ref = r; stamp = i; } }
- yes we have to re-declare the same class as ReferenceIntegerPair as it's inner private inside AtomicStampedReference.
+some refactoring stuff and final version is:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler { private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class); private final AtomicStampedReferencemessageCounter = new AtomicStampedReference ((long)0, 0); @Override public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) { final StampedReferencePairPub current = getAndSetMessageCounter(message.getCounter()); final long expectedCounter = current.ref + 1; if (expectedCounter == message.getCounter()) { processBusinessStuff(message); return true; } else if (expectedCounter > message.getCounter()) { /* ignore stale message: attempt to restore the sequence to prevent an error on next good message */ final int expectedStamp = current.stamp + 1; boolean restored = messageCounter.compareAndSet(message.getCounter(), current.ref, expectedStamp, expectedStamp + 1); log.error(String.format("messaging system ordering bug: got stale message %s while expected %s! Sequence restored: %s", message.getCounter(), expectedCounter, restored)); // some other notifying stuff... } else if (expectedCounter < message.getCounter()) { log.error(String.format("got forward message %s while expected %s, probably missed: %s", message.getCounter(), expectedCounter, message.getCounter() - expectedCounter)); // some other notifying stuff... } return false; }
That's it! Now we can have only one false missed message notification after any stale message in case of bad timings and high concurrency
Quite a vivid example on how simple things get complicated in non-blocking code, consider how complex a fully correct algorithm will be! =)
+Full sources with jnit tests, +SO question
Комментариев нет:
Отправить комментарий