Skip to content
Qingtian Wang edited this page May 14, 2022 · 9 revisions

conseq4j

Conseq4J is a Java concurrent API to sequence related tasks while concurring unrelated ones, where "conseq" is short for concurrent sequencer.

While being a generic Java concurrent API, conseq4j has a typical use case with an asynchronous message consumer that can benefit from concurrent processing.

First off, you can do Setup 1 in a message consumer. The messaging provider (an EMS queue, a Kafka topic partition, etc.) will usually make sure that messages are delivered to the provider-managed onMessage method in the same order as they are received and won't deliver the next message until the previous call to the method has returned. Thus logically, all messages are consumed in a single-threaded fashion in the same order as they are delivered through the messaging provider.

Setup 1: globally sequential

public class MessageConsumer {

    /**
     * Suppose run-time invocation of this method is managed by the messaging provider
     */
    public void onMessage(Message shoppingEvent) {
        process(shoppingEvent); // delegate to business method
    }

    /**
     * Business processing method
     */
    private void process(Message shoppingEvent) {
        ...
    }
    ...
  • That is all well and good, but processing all messages in sequential order globally is a bit slow, isn't it? It's overly conservative, to say the least, especially for multiprocessing systems and/or IO bound processes.

To speed up the process, you really want to do Setup 2 if you can - just "shot-gun" a bunch of concurrent threads - except sometimes you can't, not when the order of message consumption matters:

Imagine while online shopping for a T-Shirt, the shopper changed the size of the shirt between Medium and Large, back and forth for like 10 times, and eventually settled on... Ok, Medium! The 10 size changing events got delivered to the messaging provider in the same order as the shopper placed them. At the time of delivery, though, your consumer application had been brought down for maintenance, so the 10 events were held and piled up inside the messaging provider... Now your consumer application came back online, and all the 10 events were delivered to you in the correct order, albeit within a very short period of time.

Setup 2: globally concurrent

public class MessageConsumer {

    private ExecutorService shotgunConcurrencer = Executors.newFixedThreadPool(10);
    
    /**
     * Suppose run-time invocation of this method is managed by the messaging provider
     */
    public void onMessage(Message shoppingEvent) {
        shotgunConcurrencer.execute(() -> process(shoppingEvent)); // Look ma, I got 10 concurrent threads working on this. That's gotta be faster, right?
    }    
    ...

As it turned out, with Setup 2, the shopper actually received a T-Shirt of size Large, instead of the Medium that s/he so painstakingly settled upon (got real mad, called you a bunch of names and knocked over your beer!). And you wonder how such thing could have ever happened... Oh, got it:

  • The shot-gun threads processed the events out of order!

Ok what then, go back to Setup 1? Yes, indeed you can do that, at the expense of limiting performance. Or, you may be able to achieve decent concurrency (and save your beer!) by using a "conseq" as in Setup 3:

Setup 3: globally concurrent, locally sequential

Style 1 - Summon a sequential executor by its sequence key, and use the executor the same way as JDK/Guava ExecutorService/ListeningExecutorService

The API:
public interface ConcurrentSequencer {
    ListeningExecutorService getSequentialExecutor(Object sequenceKey);
}
The usage example:
public class MessageConsumer {

    private ConcurrentSequencer conseq = Conseq.newBuilder().globalConcurrency(10).build();
    
    /**
     * Suppose run-time invocation of this method is managed by the messaging provider
     */
    public void onMessage(Message shoppingEvent) {
    
        // conseq4j API to summon sequential executor: Events with the same shopping cart Id are processed sequentially by the same executor. Events with different sequence keys will be attempted to process concurrently by different executors.
        conseq.getSequentialExecutor(shoppingEvent.getShoppingCartId()).execute(() -> process(shoppingEvent)); 
    }
    
    /**
     * Business method, most likely translating message to domain objects for further processing by using other collaborators 
     */
    private void process(Message shoppingEvent) {
        ...
    }
    ...

That is, to use a conseq (concurrent sequencer) in the asynchronous consumer, such that:

  • Related events are sequentially processed; unrelated events are concurrently processed (see detail below).

The executor produced by the API is logically single-threaded. Within the sequential execution context, it bears the same syntactic richness and semantic robustness that the Gauva ListeningExecutorService (a subtype of JDK ExecutorService) has to offer. Repeated calls on the same (equal) sequence key get back the same (created/cached) executor instance. Thus, starting from the single-thread consumer, as long as you summon the conseq's executors by the right sequence keys, you can rest assured that related events with the same sequence key are never executed out of order, while unrelated events enjoy concurrent executions of up to the maximum number of executors.

At run-time, the global concurrency of a conseq is decided not only by the preset maximum number of concurrent executors - largely so, naturally - but also by how evenly the tasks are distributed among those executors: the more evenly, the better. The task distribution is driven by:

  1. How evenly spread-out the sequence keys' hash code values are (e.g., if all tasks carry the same sequence key, then only one/same executor will be running the tasks no matter how many executors are configured to be potentially available.)
  2. How evenly the bucket hashing algorithm can spread different sequence keys into different hash buckets

The conseq4j API simply uses the sequence key's JDK hash code Math.floorMod over the target global concurrency (the max number of concurrent threads/executors) to get the corresponding hash buckets. Each bucket is associated with a single thread executor.

By default, a conseq has unbounded (Integer.MAX_VALUE) capacities. The capacities refer to

  1. the conseq's maximum count of concurrent executors
  2. each executor's task queue size (See JDK Javadoc on capacity of a bounded BlockingQueue)

As always, even with unbounded capacities as in Option 0, related tasks with the same sequence key are still processed sequentially by the same executor, while unrelated tasks can be processed concurrently by a potentially unbounded number of executors:

Option 0: all default, unbounded capacities
ConcurrentSequencer conseq = Conseq.newBuilder().build();

The conseq in Option 1 has a max of 10 concurrent executors, each executor has an unbounded task queue size:

Option 1: partially bounded capacity on max concurrent executors
ConcurrentSequencer conseq = Conseq.newBuilder().globalConcurrency(10).build();

The conseq in Option 2 has an unbounded max number of concurrent executors, each executor has a task queue size of 20:

Option 2: partially bounded capacity on task queue size
ConcurrentSequencer conseq = Conseq.newBuilder().executorTaskQueueSize(20).build();

The conseq in Option 3 has a max of 10 concurrent executors, each executor has a task queue size of 20. Note that, in this case, the total task queue size of the entire conseq is 200 (i.e., 20 x 10):

Option 3: fully bounded capacities on both max concurrent executors and task queue size
ConcurrentSequencer conseq = Conseq.newBuilder().globalConcurrency(10).executorTaskQueueSize(20).build();
Considerations on capacities

In a Cloud environment, you might want to consider leaving at least one of the conseq's capacities as default/unbounded, especially the task queue size of the individual executor. When an executor's capacity is exceeded, the default/JDK policy is to reject further tasks by throwing exceptions. If you fully bound a conseq's capacities as in Option 3, you may be able to prevent the running node/JVM from crashing, but tasks beyond the preset capacities will be rejected, which is also undesirable. By having some unbounded capacity as in Option 0/1/2, the idea is to leverage the Cloud's autoscaling mechanism to properly scale out the system and prevent both undesired outcomes - task rejection and node crash. In other words, the conseq's capacities should be large enough to ensure that the autoscaling kicks in before either one of those undesired outcomes does.

Style 2 - Directly use conseq4j API as a service, as with a JDK ExecutorService but parameterized with a sequence key

This style further decouples the runnable tasks from their executors, by avoiding the secondary bucket hashing. The sequence key's hash code is directly used to locate the corresponding (pooled) sequential executor. That eliminates the secondary hash collision, so unrelated tasks will not unfairly block each other from executing. As a trade-off, more synchronization checkings exist. This style may suit better with work loads that are more sensitive on individual response-time.

The API:
public interface ConcurrentSequencerService {

    void execute(Object sequenceKey, Runnable runnable);

    <T> Future<T> submit(Object sequenceKey, Callable<T> task);

    <T> Future<T> submit(Object sequenceKey, Runnable task, T result);

    Future<?> submit(Object sequenceKey, Runnable task);

    <T> List<Future<T>> invokeAll(Object sequenceKey, Collection<? extends Callable<T>> tasks) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Object sequenceKey, Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    <T> T invokeAny(Object sequenceKey, Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Object sequenceKey, Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

}
The usage example:
public class MessageConsumer {

    private ConcurrentSequencerService conseqService = ConseqService.newBuilder().globalConcurrency(10).build();
    
    /**
     * Suppose run-time invocation of this method is managed by the messaging provider
     */
    public void onMessage(Message shoppingEvent) {
        try {
        
            // conseq4j API as a service: Concurrent process, preserving the order/sequence of the tasks
            List<Future<MySelectionResult>> sequencedResults = conseqService.invokeAll(shoppingEvent.getShoppingCartId(), toSequencedSelectionCallables(shoppingEvent));
             
            // Single-threaded send, same order/sequence as processed
            publishAll(sequencedResults);
        } catch(InterruptedException e) {
            ...
        }
    }
    
    /**
     * Convert to Callable tasks, in the proper order/sequence, which are then submitted to the conseq4j API.
     */
    private List<Callable<MySelectionResult>> toSequencedSelectionCallables(Message shoppingEvent) {
        ...
    }
    
    /**
     * Use a message sender to publish future results. Yes, we are single-threading here to preserve the processing order.
     */
    private void publishAll(List<Future<MySelectionResult>> sequencedResults) {
        sequencedResults.forEach(futureResult -> messageSender.send(futureResult.get());         
    }
    ...

Clone this wiki locally