6. Event Processing

The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Listeners. This is the responsibility of the Event Bus. Axon Framework provides an Event Bus and some base classes to help you implement Event Listeners.

6.1. Event Bus

The EventBus is the mechanism that dispatches events to the subscribed event listeners. Axon Framework provides two implementation of the event bus: SimpleEventBus and ClusteringEventBus. Both implementations manages subscribed EventListeners and forward all incoming events to all subscribed listeners. This means that Event Listeners must be explicitly registered with the Event Bus in order for them to receive events. The registration process is thread safe. Listeners may register and unregister for events at any time.

6.1.1. Simple Event Bus

The SimpleEventBus is, as the name suggests, a very basic implementation of the EventBus interface. It just dispatches each incoming Event to each of the subscribed EventListeners sequentiall. If an EventListener throws an Exception, dispatching stops and the exception is propagated to the component publising the Event.

The SimpleEventBus is suitable for most cases where dispatching is only done locally, in a single JVM. Once you application requires Events to be published across multiple JVMs, you could consider using the ClusteringEventBus instead.

6.1.2. Clustering Event Bus

The ClusteringEventsBus allows application developers to bundle EventListeners into Clusters based on their properties and non-functional requirements. The ClusteringEventBus is also more capable to deal with Events being dispatched among different machines.

The ClusteringEventsBus contains two mechanisms: the ClusterSelector, which selects a Cluster instance for each of the registered EventListeners, and the EventBusTerminal, which is responsible for dispatching Events to each of the relevant clusters.

[Note]Background: Axon Terminal

In the nervous system, an Axon (one of the components of a Neuron) transports electrical signals. These Neurons are interconnected in very complex arrangements. The Axon Terminal is responsible for transmitting these signals from one Neuron to another.

More information: www.wikipedia.org/wiki/Axon_terminal.

ClusterSelector

The primary responsibility of the ClusterSelector is to, as the name suggests, select a cluster for each Event Listener that subscribes to the Event Bus. By default, all Event Listeners are placed in a single Cluster instance, which dispatches events to its members sequentially and in the calling thread (similar to how the SimpleEventBus works). By providing a custom implementation, you can arrange the Event Listeners into different Cluster instances to suit the requirements of your architecture.

At this moment, there is a single implementation of the Cluster interface: SimpleCluster. This implementation calls each EventListener sequentially in the calling thread. By adding information in the Meta Data of a cluster, the selector can provide hints to the Terminal about the expected behavior.

EventBusTerminal

The EventBusTerminal forms the bridge between the events being dispatched and the Clusters inside the Event Bus. The terminal is aware of any remoting technologies used, such as JMS, AMQP, etc. The default implementation dispatches published events to each of the (local) clusters using the publishing thread. This means that with the default terminal, and the default ClusterSelector, the behavior of the ClusteringEventBus is exactly the same as that of the SimpleEventBus.

In a typical AMQP based configuration, the EventBusTerminal would send published events to an Exchange. For each cluster, a Queue would be connected to that exchange. The EventBusTerminal will create a consumer for each cluster, which reads from its related Queue and forwards each message to that cluster. Event Listeners in a distributed environment where at most one instance should receive an Events should be placed in a separate cluster, which competes with the other instances on a single Queue.

6.2. Event Listeners

Event listeners are the component that act on incoming events. These events may be of any type of the events mentioned in Section 4.1, “Events”. In the Axon Framework, all event listeners must implement the EventListener interface.

6.2.1. Basic configuration

Event listeners need to be registered with an event bus (see Section 6.1, “Event Bus”) to be notified of events. Axon provides a base implementation that take care of this, and other things, for you.

AnnotationEventListenerAdapter

The AnnotationEventListenerAdapter can wrap any object into an event listener. The adapter will invoke the most appropriate event handler method available. These event handler methods must be annotated with the @EventHandler annotation and are resolved according to the same rules that count for annotated aggregate roots (see the section called “ AbstractAnnotatedAggregateRoot ”).

The constructor of the AnnotationEventListenerAdapter takes two parameters: the annotated bean, and the EventBus, to which the listener should subscribe. You can subscribe and unsubscribe the event listener using the subscribe() and unsubscribe() methods on the adapter.

[Tip]Tip

If you use Spring, you can automatically wrap all annotated event listeners with an adapter automatically by adding <axon:annotation-config/> to your application context. Axon will automatically find and wrap annotated event listeners inside an AnnotationEventListenerAdapter and register them with an event bus.

6.2.2. Asynchronous event processing

By default, event listeners process events in the thread that dispatches them. This means that the thread that executes the command will have to wait untill all event handling has finished. For some types of event listeners this is not the optimal form of processing. Asynchronous event processing improves the scalability of the application, with the penalty of added complexity to deal with "eventual consistency". With the Axon Framework, you can easily convert any event handler into an asynchronous event handler by wrapping it in an AsynchronousEventHandlerWrapper or, when using annotations, adding the type-level AsynchronousEventListener annotation.

The AsynchronousEventHandlerWrapper needs some extra configuration to make an event handler asynchronous. The first thing that the wrapper needs is an Executor, for example a ThreadPoolExecutor. The second is the SequencingPolicy, a definition of which events may be processed in parallel, and which sequentially. The last one is optional: the TransactionManager, which enables you to run event processing within a transaction. The next pragraphs will provide more details about the configuration options.

The Executor is responsible for executing the event processing. The actual implementation most likely depends on the environment that the application runs in and the SLA of the event handler. An example is the ThreadPoolExecutor, which maintains a pool of threads for the event handlers to use to process events. The AsynchonousEventHandlerWrapper will manage the processing of incoming events in the provided executor. If an instance of a ScheduledThreadPoolExecutor is provided, the AsynchronousEventHandlerWrapper will automatically leverage its ability to schedule processing in the cases of delayed retries. See Section 6.2.3, “Managing transactions in asynchronous event handling” for more information about transactions.

The SequencingPolicy defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If two events have the same sequence identifier, this means that they must be handled sequentially be the event handler. A null sequence identifier means the event may be processed in parallel with any other event.

Axon provides a number of common policies you can use:

  • The FullConcurrencyPolicy will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.

  • The SequentialPolicy tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished. For annotated event handlers, this is the default policy.

  • SequentialPerAggregatePolicy will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.

Besides these provided policies, you can define your own. All policies must implement the EventSequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the identifier sequence identifier for a given event. Events for which an equals sequence identifer is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.

A TransactionManager can be assigned to a AsynchronousEventHandlerWrapper to add transactional processing of events. To optimize processing, events can be processed in small batches inside a transaction. The transaction manager has the ability to influence the size of these batches and can decide to either commit, skip or retry event processing based on the result of a batch. See Section 6.2.3, “Managing transactions in asynchronous event handling” for more information.

Annotation support for concurrent processing

If you use the AnnotationEventListenerAdapter, or <axon:annotation-config/>, an Executor must be configured to allow asynchronous processing of events.

You can configure the event sequencing policy on the @AsynchronousEventListener annotation. You then set the sequencePolicyClass to the type of policy you like to use. Note that you can only choose policy classes that provide a public no-arg constructor.

@AsynchronousEventListener(sequencingPolicyClass = MyCustomPolicy.class)
public class MyEventListener() {

    @EventHandler
    public void onSomeImportantEvent(MyEvent event) {
        // eventProcessing logic
    }
}

public class MyCustomPolicy implements EventSequencingPolicy {
    public Object getSequenceIdentifierFor(Event event) {
        if (event instanceof MyEvent) {
            // let's assume that we do processing based on the someProperty field.
            return ((MyEvent) event).someProperty();
        }
        return null;
    }
}                    

With annotation support, the event handler bean must also act as a transaction manager in order to support transactions. There is annotation support for transaction management, too (see Section 6.2.3, “Managing transactions in asynchronous event handling”).

6.2.3. Managing transactions in asynchronous event handling

In some cases, your event handlers have to store data in systems that use transactions. Starting and committing a transaction for each single event has a big performance impact. In Axon, events are processed in batches. The batch size depends of the number of events that need to be processed and the settings provided by the event handler. By default, the batch size is set to the number of events available in the processing queue at the time a batch starts.

[Note]Note

Typically, when using synchronous event handling, the transaction boundary is managed at the Command Bus level. Asynchronous event handlers, on the other hand, run in another thread and are often unable to act within the same transaction. The transaction managers used by event handlers should not be confused with the transaction interceptors, which are used with the Command Bus. See Section 3.5.1, “Transaction management” for more information about transactions in the command bus.

In most cases, event handling is done using a thread pool executor, or scheduler. The scheduler will schedule batches of event processing as soon as event become available. When a batch is completed, the scheduler will reschedule processing of the next batch, as long as more events are available. The smaller a batch, the more "fair" the distribution of event handler processing is, but also the more scheduling overhead you create.

When an event listener is wrapped with the AsynchronousEventHandlerWrapper, you can configure a TransactionManager to handle transactions for the event listener. The transaction manager can, based on the information in the TransactionStatus object, decide to start, commit or rollback a transaction to an external system.

The beforeTransaction(TransactionStatus) method is invoked just before Axon will start handling an event batch. You can use the TransactionStatus object to configure the batch before it is started. For example, you can change the maximum number of events that may run in the batch.

The afterTransaction(TransactionStatus) method is invoked after the batch has been processed, but before the scheduler has scheduled the next batch. Based on the value of isSuccessful(), you can decide to commit or rollback the underlying transaction.

Configuring transactional batches

There are a number of settings you can use on the TransactionStatus object.

You can configure a yielding policy, which gives the scheduler an indication of that to do when a batch has finished, but more events are available for processing. Use DO_NOT_YIELD if you want the scheduler to continue processing immediately as long as new events are available for processing. The YIELD_AFTER_TRANSACTION policy will tell the scheduler to reschedule the next batch for processing when a thread is available. The first will make sure events are processed earlier, while the latter provides a fairer execution of events, as yielding provides waiting thread a chance to start processing. The choice of yielding policy should be driven by the SLA of the event listener.

You can set the maximum number of events to handle within a transaction using setMaxTransactionSize(int). The default of this value is the number of events ready for processing at the moment the transaction started.

Error handling

When an event handler throws an exception, for example because a data source is not available, the transaction is marked as failed. In that case, isSuccessful() on the TransactionStatus object will return false and getException() will return the exception that the scheduler caught. It is the responsibility of the event listener to rollback or commit any active underlying transactions, based on the information provided by these methods.

The event handler can provide a policy setRetryPolicy(RetryPolicy) to tell the scheduler what to do in such case. There are three policies, each for a specific scenario:

  • RETRY_TRANSACTION tells the event handler scheduler that the entire transaction should be retried. It will reschedule all the events in the current transaction for processing. This policy is suitable when the event listener processes events to a transactional data source that rolls back an entire transaction.

  • RETRY_LAST_EVENT is the policy that tells the scheduler to only retry the last event in the transaction. This is suitable if the underlying data source does not support transactions or if the transaction was committed without the last event.

  • SKIP_FAILED_EVENT will tell the scheduler to ignore the exception and continue processing with the next event. The event listener can still try to commit the underlying transaction to persist any changed made while processing other events in this transaction. This is the default policy.

Note that the SKIP_FAILED_EVENT is the default policy. For event handlers that use an underlying mechanism to perform actions, this might not be a suitable policy. Exceptions resulting from errors in these underlying systems (such as databases or email clients) would cause events to be ignored when the underlying system is unavailable. In error situations, the event listener should inspect the exception (using the getException() method) and decide whether it makes sense to retry processing of this event. If that is the case, it should set the RETRY_LAST_EVENT or RETRY_TRANSACTION policy, depending on the transactional behavior of the underlying system.

When the chosen policy forces a retry of event processing, the processing is delayed by the number of milliseconds defined in the retryInterval property. The default interval is 5 seconds.

Manipulating transactions during event processing

You can change transaction semantics event during event processing. This can be done in one of two ways, depending on the type of event handler you use.

If you use the @EventHandler annotation to mark event handler methods, you may use a second parameter of type TransactionStatus. If such parameter is available on the annotated method, the current TransactionStatus object is passed as a parameter.

Alternatively, you can use the static TransactionStatus.current() accessor to gain access to the status of the current transaction. Note that this method returns null if there is no active transaction.

With the current transaction status, you can use the requestImmediateYield() and requestImmediateCommit() methods to end the transaction after processing of the event. The former will also tell the scheduler to reschedule the remainder of the events for another batch. The latter will use the yield policy to see what needs to be done. Since the default yielding policy is YIELD_AFTER_TRANSACTION, the behavior of both methods is identical when using these defaults.

Annotation support

As with many of the other supported features in Axon, there is also annotation support for transaction management. You have several options to configure transactions.

The first is to annotate methods on your EventListener with @BeforeTransaction and @AfterTransaction. These methods will be called before and after the execution of a transactional batch, respectively. The annotated methods may accept a single parameter of type TransactionStatus, which provides access to transaction details, such as current status and configuration.

Alternatively, you can use an external Transaction Manager, which you assign to a field. If you annotate that field with @TransactionManager, Axon will autodetect it and use it as transaction manager for that listener. The transaction manager may be either one that implements the TransactionManager interface, or any other type that uses annotations.

Provided TransactionManager implementations

Currently, Axon Framework provides one TransactionManager implementation, the SpringTransactionManager. This implemenation uses Spring's PlatformTransactionManager as underlying transaction mechanism. That means the SpringTransactionManager can manage any transactions in resources that Spring supports.