5. Repositories and Event Stores

The repository is the mechanism that provides access to aggregates. The repository acts as a gateway to the actual storage mechanism used to persist the data. In CQRS, the repositories only need to be able to find aggregates based on their unique identifier. Any other types of queries should be performed against the query database, not the Repository.

In the Axon Framework, all repositories must implement the Repository interface. This interface prescribes three methods:load(identifier, version), load(identifier) andadd(aggregate). The load methods allows you to load aggregates from the repository. The optional version parameter is used to detect concurrent modifications (seeSection 5.5, “Advanced conflict detection and resolution”). add is used to register newly created aggregates in the repository.

Depending on your underlying persistence storage and auditing needs, there are a number of base implementations that provide basic functionality needed by most repositories. Axon Framework makes a distinction between repositories that save the current state of the aggregate (seeSection 5.1, “Standard repositories”), and those that store the events of an aggregate (seeSection 5.2, “Event Sourcing repositories”).

Note that the Repository interface does not prescribe a delete(identifier) method. Deleting aggregates is done by invoking the (protected) markDeleted() method in an aggreate. This method is protected and not available from outside the aggregate. The motivation for this, is that the aggregate is responsible for maintaining its own state. Deleting an aggregate is a state migration like any other, with the only difference that it is irreversible in many cases. You should create your own meaningful method on your aggregate which sets the aggregate's state to "deleted". This also allows you to register any events that you would like to have published.

Repositories should use the isDeleted() method to find out if an aggregate has been marked for deletion. If such an aggregate is then loaded again, the repository should throw an AggregateNotFoundException (or when possible, an AggregateDeletedException). Axon's standard repository implementations will delete an aggregate from the repository, while event sourcing repositories will append a special AggregateDeletedEvent to the event store.

5.1. Standard repositories

Standard repositories store the actual state of an Aggregate. Upon each change, the new state will overwrite the old. This makes it possible for the query components of the application to use the same information the command component also uses. This could, depending on the type of application you are creating, be the simplest solution. If that is the case, Axon provides some building blocks that help you implement such a repository.

AbstractRepository

The most basic implementation of the repository is AbstractRepository. It takes care of the event publishing when an aggregate is saved. The actual persistence mechanism must still be implemented. This implementation doesn't provide any locking mechanism and expects the underlying data storage mechanism to provide it.

LockingRepository

If the underlying data store does not provide any locking mechanism to prevent concurrent modifications of aggregates, consider using the abstract LockingRepository implementation. Besides providing event dispatching logic, it will also ensure that aggregates are not concurrently modified.

You can configure the LockingRepository to use an optimistic locking strategy, or a pessimistic one. When the optimistic lock detects concurrent access, the second thread saving an aggregate will receive a ConcurrencyException. The pessimistic lock will prevent concurrent access to the aggregate alltogether. The pessimistic locking strategy is the default strategy.

[Warning]Event ordering and optimistic locking strategy

Note that the optimistic lock doesn't lock any threads at all. While this reduces contention, it also means that the thread scheduler of your underlying architecture (OS, CPU, etc) is free to schedule threads as it sees fit. In high-concurrent environments (many threads accessing the same aggregate simultaneously), this could lead to events not being dispatched in exactly the same order as they are generated. If this guarantee is important, use pessimistic locking instead.

[Note]ConcurrencyException vs ConflictingModificationException

Note that there is a clear distinction between a ConcurrencyException and a ConflictingModificationException. The first is used to indicate a repository cannot save an aggregate, because the changes it contains were not applied to the latest available version. The latter indicates that the loaded aggregate contains changes that might not have been seen by the end-user. See Section 5.5, “Advanced conflict detection and resolution” for more information.

GenericJpaRepository

This is a repository implementation that can store JPA compatible Aggregates, such as AbstractJpaAggregateRoot subclasses. It is configured with an EntityManager to manage the actual persistence, and a class specifiying the actual type of Aggregate stored in the Repository.

5.2. Event Sourcing repositories

Aggregate roots that implement the EventSourcedAggregateRoot interface can be stored in an event sourcing repository. Those repositories do not store the aggregate itself, but the series of events generated by the aggregate. Based on these events, the state of an aggregate can be restored at any time.

EventSourcingRepository

The EventSourcingRepository implementation provides the basic functionality needed by any event sourcing repository in the AxonFramework. It depends on an EventStore (seeSection 5.3, “Event store implementations”), which abstracts the actual storage mechanism for the events and anAggregateFactory, which is reponsible for creating uninitialized aggregate instances.

The AggregateFactory specifies which aggregate needs to be created and how. Once an aggregate has been created, the EventSourcingRepository can initialize is using the Events it loaded from the Event Store. Axon Framework comes with a number of AggregateFactory implementations that you may use. If they do not suffice, it is very easy to create your own implementation.

CachingEventSourcingRepository

Initializing aggregates based on the events can be a time-consuming effort, compared to the direct aggregate loading of the simple repository implementations. The CachingEventSourcingRepository provides a cache from which aggregates can be loaded if available. You can configure any jcache implementation with this repository. Note that this implementation can only use caching in combination with a pessimistic locking strategy.

[Note]Note

Using a cache with optimistic locking would create undesired side-effects. Optimistic locking allows concurrent access to objects and will only fail when two threads have concurrently made any modifications to that object. When using a cache, both threads will receive the same instance of the object. They will both apply their changes to that same instance, potentially interfering with eachother.

GenericAggregateFactory

The GenericAggregateFactory is a special AggregateFactory implementation that can be used for any type of Event Sourced Aggregate Root. There is however, a convention that these EventSourcedAggregateRoot classes must adhere to: the type must declare an accessible constructor accepting an AggregateIdentifier as single parameter. This constructor may not perform any initialization on the aggregate, other than setting the identifier.

The GenericAggregateFactory is suitable for most scenarios where aggregates do not need special injection of non-serializable resources.

SpringPrototypeAggregateFactory

Depending on your architectural choices, it might be useful to inject dependencies into your aggregates using Spring. You could, for example, inject query repositories into your aggregate to ensure the existance (or inexistance) of certain values.

To inject dependencies into your aggregates, you need to configure a prototype bean of your aggregate root in the Spring context that also defines the SpringPrototypeAggregateFactory. Instead of creating regular instances of using a constructor, it uses the Spring Application Context to instantiate your aggregates. This will also inject any dependencies in your aggregate.

Implementing your own AggregateFactory

In some cases, the GenericAggregateFactory just doesn't deliver what you need. For example, you could have an abstract aggregate type with multiple implementations for different scenarios (e.g. PublicUserAccount and BackOfficeAccount both extending anAccount). Instead of creating different repositories for each of the aggregates, you could use a single repository, and configure an AggregateFactory that is aware of the different implementations.

The AggregateFactory must specify the aggregate type identifier. This is a String that the Event Store needs to figure out which events belong to which type of aggregate. Typically, this name is deducted from the abstract super-aggregate. In the given example that could be: Account.

The bulk of the work the Aggregate Factory does is creating uninitialized Aggregate instances. It must do so using a given aggregate identifier and the first Event from the stream. Usually, this Event is a creation event which contains hints about the expected type of aggregate. You can use this information to choose an implementation and invoke its constructor. Make sure no Events are applied by that constructor; the aggregate must be uninitialized.

HybridJpaRepository

The HybridJpaRepository is a combination of the GenericJpaRepository and an Event Sourcing repository. It can only deal with event sourced aggregates, and stores them in a relational model as well as in an event store. When the repository reads an aggregate back in, it uses the relational model exclusively.

This repository removes the need for Event Upcasters, making data migrations potentially easier. Since the aggregates are event sourced, you keep the ability to use the given-when-then test fixtures (seeChapter 8, Testing). On the other hand, since it doesn't use the event store for reading, it doesn't allow for automated conflict resolution.

5.3. Event store implementations

Event Sourcing repositories need an event store to store and load events from aggregates. Typically, event stores are capable of storing events from multiple types of aggregates, but it is not a requirement.

Axon provides two implementations of event stores, both are capable of storing all domain events (those that extend the DomainEvent class). These event stores use a Serializer to serialize and deserialize the event. By default, Axon provides an implementation of the Event Serializer that serializes events to XML: the XStreamEventSerializer.

FileSystemEventStore

The FileSystemEventStore stores the events in a file on the file system. It provides good performance and easy configuration. The downside of this event store is that is does not provide transaction support and doesn't cluster very well. The only configuration needed is the location where the event store may store its files and the serializer to use to actually serialize and deserialize the events. Note that the provided url must end on a slash. This is due to the way Spring's Resource implementations work.

JpaEventStore

The JpaEventStore stores events in a JPA-compatible data source. Unlike the file system version, the JPAEventStore supports transactions. The JPA Event Store stores events in so called entries. These entries contain the serialized form of an event, as well as some fields where meta-data is stored for fast lookup of these entries. To use the JpaEventStore, you must have the javax.persistence annotations on your classpath.

By default, the event store needs you to configure your persistence context (defined in META-INF/persistence.xml file) to contain the classes DomainEventEntry and SnapshotEventEntry (both in the org.axonframework.eventstore.jpa package).

Below is an example configuration of a persistence context configuration:

<persistence xmlns="http://java.sun.com/xml/ns/persistence" version="1.0">
    <persistence-unit name="eventStore"(1) transaction-type="RESOURCE_LOCAL">
        <class>org...eventstore.jpa.DomainEventEntry</class> (2)
        <class>org...eventstore.jpa.SnapshotEventEntry</class>
    </persistence-unit>
</persistence>

1

In this sample, there is is specific persistence unit for the event store. You may, however, choose to add the third line to any other persistence unit configuration.

2

This line registers the DomainEventEntry (the class used by theJpaEventStore) with the persistence context.

[Note]Detecting duplicate key violations in the database

Axon uses Locking to prevent two threads from accessing the same Aggregate. However, if you have multiple JVMs on the same database, this won't help you. In that case, you'd have to rely on the database to detect conflicts. Concurrent access to the event store will result in a Key Constraint Violation, as the table only allows a single Event for an aggregate with any sequence number. Inserting a second event for an existing aggregate with an existing sequence number will result in an error.

The JPA EventStore can detect this error and translate it to a ConcurrencyException. However, each database system reports this violation differently. If you register your DataSource with the JpaEventStore, it will try to detect the type of database and figure out which error codes represent a Key Constraint Violation. Alternatively, you may provide a PersistenceExceptionTranslator instance, which can tell if a given exception represents a Key Constraint Violation.

If no DataSource or PersistenceExceptionTranslator is provided, exceptions from the Database driver are thrown as-is.

Customizing the Event storage

By default, the JPA Event Store stores entries in DomainEventEntry and SnapshotEventEntry entities. While still will suffice in many cases, you might encounter a situation where the meta-data provided by these entities is not enough. Or you might want to store events of different aggregate types in different tables.

If that is the case, you may provide your own implementation of EventEntryStore in the JPA Event Store's constructor. You will need to provide implementations of methods that load and store serialized events. Check the API Documentation of the EventEntryStore class for implementation requirements.

[Warning]Memory consumption warning

Note that persistence providers, such as Hibernate, use a first-level cache on their EntityManager implementation. Typically, this means that all entities used or returned in queries are attached to the EntityManager. They are only cleared when the surrounding transaction is committed or an explicit "clear" in performed inside the transaction.

To work around this issue, make sure to exclusively query the serialized form (the byte[]) of events. Do not fetch the entire entity object. Alternatively, call EntityManager.flush() and EntityManager.clear() after fetching a batch of events. Failure to do so might result in OutOfMemoryExceptions when loading large streams of events.

Implementing your own event store

If you have specific requirements for an event store, it is quite easy to implement one using different underlying data sources. Reading and appending events is done using a DomainEventStream, which is quite similar to iterator implementations.

[Tip]Tip

The SimpleDomainEventStream class will make the contents of a sequence ( List or array) of DomainEvent instances accessible as event stream.

Influencing the serialization process

Event Stores need a way to serialize the Domain Event to prepare it for storage. By default, Axon uses the XStreamEventSerializer, which uses XStream (see xstream.codehaus.org) to serialize Domain Events into XML and vice versa. XStream is reasonably fast and is more flexible than Java Serialization. Furthermore, the result of XStream serialization is human readable. Quite useful for logging and debugging purposes.

The XStreamEventSerializer can be configured. You can define aliases it should use for certain packages, classes or even fields. Besides being a nice way to shorten potentially long names, aliases can also be used when class definitions of event change. For more information about aliases, visit the XStream website: xstream.codehaus.org.

You may also implement your own Event Serializer, simply by creating a class that implements Serializer, and configuring the Event Store to use that implementation instead of the default.

Changing the definition of Events

It is not unlikely that a definition of an Event remains unchanged during the entire lifespan of an application. New insights, changes in requirements and many other factors can lead to modifications in Events. Since the Event Store is considered a read and append-only data source, your application must be able to read all events, regardless of when they have been added.

In Axon, Events have a revision, a numeric value that defaults to 0. If an Event definition changes, you should update the revision number too. The deserialization process can then be adapted when your application needs to cope with old revisions of events. A class file for an Event only needs to support the latest revision. When old revisions are deserialized, they can be "upcasted" to the newer revision. The revision number of an Event is provided to the constructor on the abstract event classes. The revision of an event is passed as a parameter in the constructor of DomainEvent.

The EventUpcaster is responsible for transforming old events into the last revision. Upcasters typically work on an intermediate representation of the Event. In the case of the XStreamEventSerializater, this intermediate representation is dom4j (see http://dom4j.sourceforge.net/), an easy to use java XML library that integrates nicely with XStream. The EventUpcaster can modify the XML structure of the event so that it matches the new definition.

5.4. Snapshotting

When aggregates live for a long time, and their state constantly changes, they will generate a large amount of events. Having to load all these events in to rebuild an aggregate's state may have a big performance impact. The snapshot event is a domain event with a special purpose: it summarises an arbitrary amount of events into a single one. By regularly creating and storing a snapshot event, the event store does not have to return long lists of events. Just the last snapshot events and all events that occurred after the snapshot was made.

For example, items in stock tend to change quite often. Each time an item is sold, an event reduces the stock by one. Every time a shipment of new items comes in, the stock is incremented by some larger number. If you sell a hundred items each day, you will produce at least 100 events per day. After a few days, your system will spend too much time reading in all these events just to find out wheter it should raise an "ItemOutOfStockEvent". A single snapshot event could replace a lot of these events, just by storing the current number of items in stock.

5.4.1. Creating a snapshot

Snapshot creation can be triggered by a number of factors, for example the number of events created since the last snapshot, the time to initialize an aggregate exceeds a certain threshold, time-based, etc. Currently, Axon provides a mechanism that allows you to trigger snapshots based on an event count threshold.

The EventCountSnapshotterTrigger provides the mechanism to trigger snapshot creation when the number of events needed to load an aggregate exceeds a certain threshold. If the number of events needed to load an aggregate exceeds a certain configurable threshold, the trigger tells a Snapshotter to create a snapshot for the aggregate.

The snapshot trigger is configured on an Event Sourcing Repository and has a number of properties that allow you to tweak triggering:

  • Snapshotter sets the actual snapshotter instance, responsible for creating and storing the actual snapshot event;

  • Trigger sets the threshold at which to trigger snapshot creation;

  • ClearCountersAfterAppend indicates whether you want to clear counters when an aggregate is stored. The optimal setting of this parameter depends mainly on your caching strategy. If you do not use caching, there is no problem in removing event counts from memory. When an aggregate is loaded, the events are loaded in, and counted again. If you use a cache, however, you may lose track of counters. Defaults to true unless the AggregateCache or AggregateCaches is set, in which case it defaults to false.

  • AggregateCache and AggregateCaches allows you to register the cache or caches that you use to store aggregates in. The snapshotter trigger will register itself as a listener on the cache. If any aggregates are evicted, the snapshotter trigger will remove the counters. This optimizes memory usage in the case your application has many aggregates. Do note that the keys of the cache are expected to be the Aggregate Identifier.

A Snapshotter is responsible for the actual creation of a snapshot. Typically, snapshotting is a process that should disturb the operational processes as little as possible. Therefore, it is recommended to run the snapshotter in a different thread. The Snapshotter interface declares a single method: scheduleSnapshot(), which takes the aggregate's type and identifier as parameters.

Axon provides theAggregateSnapshotter, which creates and stores AggregateSnapshot instances. This is a special type of snapshot, since it contains the actual aggregate instance within it. The repositories provided by Axon are aware of this type of snapshot, and will extract the aggregate from it, instead of instantiating a new one. All events loaded after the snapshot events are streamed to the extracted aggregate instance.

[Note]Note

Do make sure that the Serializer instance you use (which defaults to the XStreamEventSerializer) is capable of serializing your aggregate. The XStreamEventSerializer requires you to use either a Sun JVM, or your aggregate must either have an accessible default constructor or implement the Serializable interface.

The AbstractSnapshotter provides a basic set of properties that allow you to tweak the way snapshots are created:

  • EventStore sets the event store that is used to load past events and store the snapshots. This event store must implement the SnapshotEventStore interface.

  • Executor sets the executor, such as a ThreadPoolExecutor that will provide the thread to process actual snapshot creation. By default, snapshots are created in the thread that calls the scheduleSnapshot() method, which is generally not recommended for production.

The AggregateSnapshotter provides on more property:

  • AggregateFactories is the property that allows you to set the factories that will create instances of your aggregates. All Axon provided repositories implement the AggregateFactory interface, and are capable of processing AggregateSnapshots. Configuring multiple aggregate factories allows you to use a single Snapshotter to create snapshots for a variety of aggregate types.

[Note]Note

If you use an executor that executes snapshot creation in another thread, make sure you configure the correct transaction management for your underlying event store, if necessary. Spring users can use the SpringAggregateSnapshotter, which allows you to configure a PlatformTransactionManager. The SpringAggregateSnapshotter will autowire all repositores, if they are not explicitly configured.

Spring users should use the SpringAggregateSnapshotter instead. See Section 9.8, “Configuring Snapshotting” for more details about configuring snapshotting in Spring.

5.4.2. Storing Snapshot Events

Both the JpaEventStore and the FileSystemEventStore are capable of storing snapshot events. They provide a special method that allows a DomainEvent to be stored as a snapshot event. You have to initialize the snapshot event completely, including the aggregate identifier and the sequence number. There is a special constructor on the DomainEvent for this purpose. The sequence number must be equal to the sequence number of the last event that was included in the state that the snapshot represents. In most cases, you can use the getLastCommittedEventSequenceNumber() on the VersionedAggregate (which each event sourced aggregate implements) to obtain the sequence number to use in the snapshot event.

When a snapshot is stored in the Event Store, it will automatically use that snapshot to summarize all prior events and return it in their place. Both event store implementations allow for concurrent creation of snapshots. This means they allow snapshots to be stored while another process is adding Events for the same aggregate. This allows the snapshotting process to run as a separate process alltogether.

[Note]Note

Normally, you can archive all events once they are part of a snapshot event. Snapshotted events will never be read in again by the event store in regular operational scenario's. However, if you want to be able to reconstruct aggregate state prior to the moment the snapshot was created, you must keep the events up to that date.

Axon provides a special type of snapshot event: the AggregateSnapshot, which stores an entire aggregate as a snapshot. The motivation is simple: your aggregate should only contain the state relevant to take business decisions. This is exactly the information you want captured in a snapshot. All Event Sourcing Repositories provided by Axon recognize the AggregateSnapshot, and will extract the aggregate from it. Beware that using this snapshot event requires that the event serialization mechanism needs to be able to serialize the aggregate.

5.4.3. Initializing an Aggregate based on a Snapshot Event

Every snapshot event is a DomainEvent instance. That means a snapshot event is handled just like any other domain event. When using annotations to demarcate event handers (@EventHandler), you can annotate a method that initializes full aggregate state based on a snapshot event. The code sample below shows how snapshot events are treated like any other domain event within the aggregate.

public class MyAggregate extends AbstractAnnotatedAggregateRoot {

    // ... code omitted for brevity

    @EventHandler
    protected void handleSomeStateChangeEvent(MyDomainEvent event) {
        // ...
    }

    @EventHandler
    protected void applySnapshot(MySnapshotEvent event) {
        // the snapshot event should contain all relevant state
        this.someState = event.someState;
        this.otherState = event.otherState;
    }
}
                

There is one type of snapshot event that is treated differently: the AggregateSnapshot. This type of snapshot event contains the actual aggregate. The repository recognizes this type of event and extract the aggregate from the snapshot. Then, all other events are re-applied to the extracted snapshot. That means aggregates never need to be able to deal with AggregateSnapshot instances themselves.

5.4.4. Pruning Snapshot Events

Once a snapshot event is written, it prevents older events and snapshot events from being read. Domain Events are still used in case a snapshot event becomes obsolete due to changes in the structure of an aggregate. The older snapshot events are hardly ever needed. SnapshotEventStore implementation may choose to keep only a limited amount of snapshots (e.g. only one) for each aggregate.

The JpaEventStore allows you to configure the amount of snapshots to keep per aggregate. It defaults to 1, meaning that only the latest snapshot event is kept for each aggregate. Use setMaxSnapshotsArchived(int) to change this setting. Use a negative integer to prevent pruning altogether.

5.5. Advanced conflict detection and resolution

One of the major advantages of being explicit about the meaning of changes, is that you can detect conflicting changes with more precision. Typically, these conflicting changes occur when two users are acting on the same data (nearly) simultaneously. Imagine two users, both looking at a specific version of the data. They both decide to make a change to that data. They will both send a command like "on version X of this aggregate, do that", where X is the expected version of the aggregate. One of them will have the changes actually applied to the expected version. The other user won't.

Instead of simply rejecting all incoming commands when aggregates have been modified by another process, you could check whether the user's intent conflicts with any unseen changes. One way to do this, is to apply the command on the latest version of the aggregate, and check the generated events against the events that occurred since the version the user expected. For example, two users look at a Customer, which has version 4. One user notices a typo in the customer's address, and decides to fix it. Another user wants to register the fact that the customer moved to another address. If the fist user applied his command first, the second one will make the change to version 5, instead of the version 4 that he expected. This second command will generate a CustomerMovedEvent. This event is compared to all unseen events: AddressCorrectedEvent, in this case. A ConflictResolver will compare these events, and decide that these conflicts may be merged. If the other user had committed first, the ConflictResolver would have decided that a AddressCorrectedEvent on top of an unseen CustomerMovedEvent is considered a conflicting change.

Axon provides the necessary infrastructure to implement advanced conflict detection. By default, all repositories will throw a ConflictingModificationException when the version of a loaded aggregate is not equal to the expected version. Event Sourcing Repositories offer support for more advanced conflict detection, as decribed in the paragraph above.

To enable advanced conflict detection, configure a ConflictResolver on the EventSourcingRepository. This ConflictResolver is responsible for detecting conflicting modifications, based on the events representing these changes. Detecting these conflicts is a matter of comparing the two lists of DomainEvents provided in the resolveConflicts method declared on the ConflictResolver. If such a conflict is found, a ConflictingModificationException (or better, a more explicit and explanatory subclass of it) must be thrown. If the ConflictResolver returns normally, the events are persisted, effectively meaning that the concurrent changes have been merged.