Distributed Caching using Redis in Spring Boot applications

Distributed Caching using Redis in Spring Boot applications

In this article we will cover multiple approaches to implement a synchronized Redis Cache across distributed Spring Boot application instances.

Summary

Redis Caching in Spring Boot Applications

In our previous article ‘Redis Caching in Spring Boot Applications’, we talked about using Redis as a cache in Spring Boot applications. Let’s have a brief summary at what is and isn’t possible using the native Redis Cache implementation.

We’ve created a simple Angular app that allows us to create multiple parallel requests to multiple instances of our spring boot application.

We create one GET-request every 0.5 seconds. Each instance of our application also gets hit after 500 milliseconds to demonstrate the effects of cache synchronization in distributed applications. There is a 2.5 second sleep in the service that resolves our GET-request. This makes it possible to measure the difference in performance by simulating an extreme scenario. We are also tracking each individual response time and aggregating them.

Without any Caching, the total response time of each server is at it’s maximum: 6 * 2.5 = 15 seconds.

By simple enabling Redis Caching, we already significantly improve our response times, since the last request will finish earlier – it was sent after the cache is populated.

Can we go any better? Of course ! By enabling Spring’s native cache synchronization, we can get response times down by over 50%.

However, we do see an issue here: clearly the cache is not being synchronized across multiple instances. While all requests of Server 1 are completed because its first request is done, Server 2 is still busy for some time, because its’ first request was started later. All consecutive requests on that server are synchronized only to its’ local application instance.

Spring Framework’s native caching only stores the information necessary for the synchronization of requests on an application-wide level. This is why separate application instances aren’t synchronized. To create synchronized caching across multiple instances (we will call this distributed caching going forward), we would need to store any information in a way in which all applications can access it.

Luckily, all our application instances have a common denominator: the cache itself. Via Spring’s cache manager, all our application instances ultimately use the same cache, based on the configurations that were done. The easiest approach would be to implement synchronization using the Redis Cache as a means of informing each application that a resource is being accessed, and that it should wait for the cache to be populated.

Using Spring AOP and Custom Annotations

One simple way of implementing our distributed caching would be to use Spring AOP (Aspect Oriented Programming) to extend the native Redis Cache functionality. We will achieve this by creating a custom annotation, which can then be placed around functions that require distributed caching.

What is AOP?

In short, AOP is a programming paradigm that allows us to add additional behaviour around existing code, without the need to modify it. In this regard, it could be viewed as another form of the decorator pattern. Learn more about Spring AOP in this article.

Custom Annotation

First, let’s create a custom annotation:

// imports skipped for readability

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedCaching {
}

We can now annotate our service functions which need distributed caching like this:

...
@Cacheable(value = "user", key = "#username")
@DistributedCaching
public User getUser(String username) {
    ...
}

Currently, nothing additional is happening yet. For that, we will define our Spring Aspect, which should execute our distributed caching code each of our annotated functions.

@Aspect
@Order(1)
public class DistributedCacheAspect {
    private CacheManager cacheManager;

    public DistributedCacheAspect(CacheManager cacheManager) {
        this.cacheManager = cacheManager;
    }
    ...
}

We’ve called our class DistributedCacheAspect and annotated it as @Aspect. This makes sure that Spring Boot will set this class up as a proper Aspect. We also have added an @Order(1) annotation. This is because we want our own aspect to be called before the @Cachable annotation, since we will add additional caching features to it.

We have also injected the CacheManager bean via constructor injection. We need the CacheManager to access our Redis Cache within this class. Since all of our Spring Boot instances will use the same cache configuration, this is also the part of our code which will connect all instances and manage synchronization.

Distributed Caching

Next, we’ll create a function which executes our code around already existing code. The basic algorithm behind our distributed caching will be:

Let’s start with implementing the main functionality of our DistributedCacheAspect class:

@Around("@annotation(DistributedCaching)")
public Object distributedCaching(ProceedingJoinPoint joinPoint) throws Throwable {
	...
}

By annotating the distributedCaching function with @Around("@annotation(DistributedCaching)") we are able to define around which functions our code shall be injected. In this case, we want this piece of code to execute around every method, which has the @DistributedCaching annotation.

The ProceedingJoinPoint class enables us to get information about the function, such as the method signature, and run the actual method whenever we want to.

First, let’s think about how we will define our access key. The easiest way would be to generate a unique ID from the method we’re calling.

String accessKey = joinPoint
    .getSignature()
    .toLongString() + Arrays.toString(joinPoint.getArgs());

Here, we simply generate a unique key by taking the signature of the annotated method and appending the method arguments to it. The method signature always stays the same, while the method arguments are the resolved arguments of the method. Instead of the name of the variable (e.g. „username”), we would get the actual variable for the specific function call („JohnDoe”).

Next we’ll need to actually check if the object which will be cached is already existing. To do this, we’ll write a helper function to parse the actual object’s key out of the method:

import org.springframework.expression.*;
...
private final ExpressionParser parser = new SpelExpressionParser();
private final LocalVariableTableParameterNameDiscoverer discoverer
                        = new LocalVariableTableParameterNameDiscoverer();

private <T> T parseSpel(Method method, Object[] arguments, String spel) {
    String[] params = discoverer.getParameterNames(method);
    EvaluationContext context = new StandardEvaluationContext();

    for (int len = 0; len < params.length; len++) {
        context.setVariable(params[len], arguments[len]);
    }

    Expression expression = parser.parseExpression(spel);
    return expression.getValue(context, (Class<T>) String.class);
}

First, we’ve added the ExpressionParser and LocalVariableTableParameterNameDiscoverer variables. We use the discoverer to get the parameter names from our method. We construct a new EvaluationContext and set it’s variables to be the method’s arguments. We parse the spel expression of the method. This is the key = "#user.username" part in the @Cachable annotation.

Finally, we resolve the expression for the given context. If your spel was set to #user.username, this function would look at the method’s arguments, find the corresponding object (in this case the one which is named „user”) and resolve the actual username variable of the user object.

Now, we can check if the object is already cached like such:

...
String key = parseSpel(method, joinPoint.getArgs(), myAnnotation.key());
String actualCachedValueCacheName = myAnnotation.value()[0];

if (cacheManager.getCache(actualCachedValueCacheName).get(key) != null) {
	return joinPoint.proceed();
}

Essentially, we’ve just filtered out the case in which the object is already cached. In this case, we just let the method proceed and Spring does its magic, returning the cached object.

Finally, the rest of the distributedCaching function’s code:

if (cacheManager.getCache("ACCESS_CACHE").get(accessKey) != null) {
	waitForSynchronization("ACCESS_CACHE", accessKey);
	return joinPoint.proceed();
} else {
	cacheManager.getCache("ACCESS_CACHE").put(accessKey, true);
	Object proceed = joinPoint.proceed();
	cacheManager.getCache(DISTRIBUTED_CACHE_NAME).evict(accessKey);
	return proceed;
}

If the access key is present (meaning the value we get is not null), we call the waitForSynchronization function to lock the thread until the cache is populated.

After that, we simply run the method. Otherwise, if the access key isn’t present, that means the current thread will be accessing the database and populating the cache. We simply put the access key into our cache, proceed with the method, and delete the key after the method is finished.

This is a perfect example of how Spring AOP helps us extend code generically.

‘Wait For Synchronization’ Method

Let’s look at the waitForSynchronization method:

private void waitForSynchronization(String accessKey) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    try {
        List<Future<Boolean>> result = executor.invokeAll(
            List.of(new CacheManagerTask(
                this.cacheManager,
                "ACCESS_CACHE",
                accessKey
            )),
            10,
            TimeUnit.SECONDS
        );
        result.get(0).get();
    } catch (InterruptedException | ExecutionException e) {
        // process what happens if the cache wasn't
        // populated after the waiting period
    }
}

Here, we create a task that runs our blocking code and periodically checks the cache to see if the cache has been populated. This returns a Future object, where calling the get method will block our code until the task is either finished, or interrupted because it took too long. You may also want to handle this situation accordingly.

public class CacheManagerTask implements Callable<Void> {

    private final CacheManager cacheManager;
    private final String accessCache;
    private final String accessKey;

    public CacheManagerTask(
        CacheManager cacheManager,
        String accessCache,
        String accessKey
    ) {
        this.cacheManager = cacheManager;
        this.accessCache = accessCache;
        this.accessKey = accessKey;
    }

    @Override
    public Void call() throws InterruptedException {
        while (cacheManager.getCache(accessCache).get(accessKey) != null) {
            Thread.sleep(10);
        }
        return null;
    }
}

With this implementation, we would actively poll the cache to check if the population has been completed. This is definitely suboptimal.

Luckily, Redis itself offers a nice way to solve this issue: Redis Pub/Sub. This allows us to subscribe to and publish about whatever we want to. We can use this to implement a non-polling approach:

private boolean lockedCache = false;
public boolean isLocked() {
	return lockedCache;
}

public void setLocked(boolean locked) {
	this.lockedCache = locked;
}

Listener, Publisher and Subscriber

First, we define a variable which will act as a lock to the thread – with getter and setter to enable access within our CacheManagerTask. Next, we will create two classes: a RedisMessageListener which implements Redis’ message listener, and also a RedisMessagePublisher to publish messages over Redis directly.

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

public class RedisMessageListener implements MessageListener {
    private DistributedCacheAspect distributedCacheAspect;

    public RedisMessageListener(
        DistributedCacheAspect distributedCacheAspect
    ) {
        this.topic = topic;
        this.distributedCacheAspect = distributedCacheAspect;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        //unlock the thread!
        distributedCacheAspect.setLocked(false);
    }
}

The listener will unlock the thread if a message for the specific topic is received. In this case, the topic will be our unique accessKey.

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.Topic;

public class RedisMessagePublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public RedisMessagePublisher(
            RedisTemplate<String, Object> redisTemplate
    ) {
        this.redisTemplate = redisTemplate;
    }

    public void publish(Topic topic, String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

The publisher will allow us to send a message for a given topic. For that reason, we need to configure additional Beans. Remember to import the redis.clients.jedis dependency in your pom.xml:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <type>jar</type>
</dependency>
public class RedisListenerConfiguration {
    @Bean
    public RedisMessageListenerContainer listenerContainer(
        RedisConnectionFactory redisConnection
    ) {
        RedisMessageListenerContainer container =
            new RedisMessageListenerContainer();

        container.setConnectionFactory(redisConnection);
        return container;
    }

    @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        return new JedisConnectionFactory();
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        final RedisTemplate<String, Object> template =
            new RedisTemplate<String, Object>();

        template.setConnectionFactory(jedisConnectionFactory());
        template.setValueSerializer(
            new GenericToStringSerializer<Object>(Object.class)
        );
        return template;
    }

    @Bean
    RedisMessagePublisher redisPublisher() {
        return new RedisMessagePublisher(redisTemplate());
    }
}

The RedisMessageListenerContainer allows us to add our own listener to subscribe to topics in our aspect class. We are also defining a JedisConnectionFactory and RedisTemplate Bean to define how the connection to Redis will be implemented. While we are at it, let’s also add our RedisMessagePublisher as a Bean – this can be used across all instances to publish our messages.

In our aspect class, we can now add these via dependency injection:

private final CacheManager cacheManager;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisMessagePublisher redisPublisher;

public DistributedCacheAspect(
    CacheManager cacheManager,
    RedisMessageListenerContainer redisMessageListenerContainer,
    MessagePublisher redisPublisher
) {
	this.cacheManager = cacheManager;
	this.redisMessageListenerContainer = redisMessageListenerContainer;
	this.redisPublisher = redisPublisher;
}

Now we can use the message publisher and subscriber to implement a non polling approach:

Topic topic = new ChannelTopic(accessKey);
if (cacheManager.getCache("ACCESS_CACHE").get(accessKey) != null) {

    lockedCache = true;
    RedisMessageListener redisMessageListener =
        new RedisMessageListener(accessKey, this);

    redisMessageListenerContainer
        .addMessageListener(redisMessageListener, topic);

    waitForSynchronization();

    redisMessageListenerContainer
        .removeMessageListener(redisMessageListener, topic);

    return joinPoint.proceed();
} else {
	cacheManager.getCache("ACCESS_CACHE").put(accessKey, true);

	Object proceed = joinPoint.proceed();

	cacheManager.getCache("ACCESS_CACHE").evict(accessKey);
	redisPublisher.publish(topic, "DONE");
	return proceed;
}

Before waiting for synchronization, we are subscribing to the topic of whatever our access key is – and unsubscribe after we’re done. Also, when we’re done evicting the access key, we’re publishing a message, noticing that we are done, to the topic of our access key. This way, the thread can wait without polling Redis by just locking itself.

private void waitForSynchronization() {

    ExecutorService executor = Executors.newSingleThreadExecutor();
    try {
        List<Future<Void>> result = executor.invokeAll(List.of(
            new CacheManagerTask(this)),
            10,
            TimeUnit.SECONDS
            ); // Timeout of 10 seconds.

        result.get(0).get();
	} catch (InterruptedException | ExecutionException e) {
		...
	}
}

The waitForSynchronization method now creates a CacheManagerTask of this, to be able to check if the thread is still locked.

public class CacheManagerTask implements Callable<Void> {
    private final DistributedCacheAspect distributedCacheAspect;
    public CacheManagerTask(DistributedCacheAspect distributedCacheAspect) {
        this.distributedCacheAspect = distributedCacheAspect;
    }

    @Override
    public Void call() throws InterruptedException {
        while (distributedCacheAspect.isLocked()) {
            Thread.sleep(10);
        }
        return null;
    }
}

Now, we’re simply waiting actively on the thread to be unlocked from outside. This happens as soon as our RedisMessageListener receives a message for our given access key as the topic. We are no longer actively polling Redis.

Now, this is what our Angular-app shows when we set it up just like before:

Putting it all together: distributed caching in action!

Notice how as soon as the first request is finished, every other request across all our deployed servers also finishes. We only have one database call being made. Each server subscribes to the cached value’s access key, and when the first request is done loading from the database, it publishes according to that topic!

How computationally expensive is this compared to just using Spring’s native caching? We are creating one request for the cached object, one for the access key, and then there is the cost of sending and subscribing to messages.

Depending on your use case, it might be worth using this implementation. There is still room for improvement though: Spring already parses the SpEL expression and checks, if the object is cached.

In the next chapter, we’ll look at directly extending this native implementation to optimize the amount of requests send to Redis!

Extending RedisCache & RedisCache Manager

The spring-boot-starter-data-redis dependency delivers a RedisCache as well as RedisCacheManager class. These implement the default functionality of Spring’s native caching. If we extend these classes and overwrite specific code regarding lookup and putting of objects into the cache, we can integrate our previous distributed caching directly into the caching classes and optimize our code further.

@Configurable
public class DistributedRedisCache extends RedisCache {

    private final String distributedCacheName = "ACCESS_CACHE";

    // ... instance variables

    protected DistributedRedisCache(
        String name, RedisCacheWriter cacheWriter,
        RedisCacheConfiguration cacheConfig,
        RedisMessageListenerContainer redisMessageListenerContainer,
        MessagePublisher messagePublisher
    ) {
        super(name, cacheWriter, cacheConfig);
        // set instance variables
    }
	...
    private void waitForSynchronization() {
        // ...just like before, except we lock *this* thread
    }
	...
}

The constructor takes every argument of the parent class RedisCache and calls the super constructor. This sets up our Redis Cache to work the exact same way as the native implementation.

Additionally, we save these arguments in our own class, because the RedisCache hasn’t been made with extension in mind and most of its variables and even functions are private. We also add our own components: RedisMessagePublisher and RedisMessageListenerContainer. The waitForSynchronization method also is set up just like before, except it will lock the DistributedRedisCache instance.

Lookup and Put Methods

Next, we overwrite the lookup and put methods. The core functionality – looking up and putting the actual values – is the same as in the parent class. We then add our own code to it.

...
@Override
protected Object lookup(Object key) {
    String accessKey = distributedCacheName + convertKey(key);
    byte[] value = cacheWriter.get(name, createAndConvertCacheKey(key));

    if (value != null) {
        return deserializeCacheValue(value);
    }

    byte[] distributedAccess = cacheWriter
        .get(distributedCacheName, createAndConvertCacheKey(accessKey));

    if (distributedAccess != null) {
        lockedCache = true;
        Topic topic = new ChannelTopic(accessKey);
        RedisMessageListener redisMessageListener =
            new RedisMessageListener(accessKey, this);

        redisMessageListenerContainer
            .addMessageListener(redisMessageListener, topic);

        waitForSynchronization();

        redisMessageListenerContainer
            .removeMessageListener(redisMessageListener, topic);

        return deserializeCacheValue(cacheWriter.get(
            name,
            createAndConvertCacheKey(key)
        ));
    }

    cacheWriter.put(
        distributedCacheName,
        createAndConvertCacheKey(accessKey),
        serializeCacheValue(true),
        cacheConfig.getTtl()
    );
    return null;
}

protected byte[] createAndConvertCacheKey(Object key) {
    return serializeCacheKey(createCacheKey(key));
}
...

In the lookup method, we first check if the object is already cached. If it is, there is no reason to look any further: we simply return it in it’s deserialized form. Otherwise, we load the accessKey from the cache: if it exists, we do our synchronization – just like before. Otherwise, we put our access key into the cache, before returning null. Returning null causes the actual method that is annotated to be accessed. The database call commences.

...
@Override
public void put(Object key, @Nullable Object value) {
    Object cacheValue = preProcessCacheValue(value);
    String accessKey = distributedCacheName + convertKey(key);
    if (!isAllowNullValues() && cacheValue == null) {
        // exception handling
    }

    //noinspection ConstantConditions
    cacheWriter.put(
        name,
        createAndConvertCacheKey(key),
        serializeCacheValue(cacheValue),
        cacheConfig.getTtl()
    );
    cacheWriter.remove(
        distributedCacheName,
        createAndConvertCacheKey(accessKey)
    );
    Topic topic = new ChannelTopic(accessKey);
    redisPublisher.publish(topic, "DONE");
}

In the put method, we simply remove the access key and publish a new message with the access key as the topic.

To keep as much functionality as possible, it’s reasonable to also extend the RedisCacheManager, since it supplies a builder with many cache configuration options.

public class DistributedRedisCacheManager extends RedisCacheManager {

    private final RedisCacheWriter cacheWriter;
    private final RedisCacheConfiguration defaultCacheConfiguration;
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;
    @Autowired
    private MessagePublisher messagePublisher;

    public DistributedRedisCacheManager(
        RedisCacheWriter cacheWriter,
        RedisCacheConfiguration defaultCacheConfiguration,
        Map<String, RedisCacheConfiguration> initialCacheConfigurations,
        boolean allowInFlightCacheCreation
    ) {
        super(
            cacheWriter,
            defaultCacheConfiguration,
            initialCacheConfigurations,
            allowInFlightCacheCreation
        );
        this.cacheWriter = cacheWriter;
        this.defaultCacheConfiguration = defaultCacheConfiguration;
    }

    @Override
    protected DistributedRedisCache createRedisCache(
        String name,
        @Nullable RedisCacheConfiguration cacheConfig
    ) {
        return new DistributedRedisCache(
            name,
            cacheWriter,
            cacheConfig != null
                ? cacheConfig
                : defaultCacheConfiguration,
            redisMessageListenerContainer,
            messagePublisher
        );
    }
	...
}

Just like before, we add our own components, call the super constructor in our own constructor and override the createRedisCache function to return our new DistributedRedisCache class. I’ve also copied over the entire RedisCacheManagerBuilder inside of the original RedisCacheManager, but replaced the final build method: it also returns our own DistributedRedisCache.

Now, to use this implementation, all we need to do is create a bean for our DistributedRedisCacheManager in a configuration class of your choice:

@Bean
public DistributedRedisCacheManager createDistributedRedisCache(
    RedisConnectionFactory redisConnectionFactory
) {
    return DistributedRedisCacheManager.RedisCacheManagerBuilder
            .builder(redisConnectionFactory)
            .withCacheConfiguration("userCache",
                    RedisCacheConfiguration
                        .defaultCacheConfig()
                        .entryTtl(Duration.ofMinutes(10))
            ).build();
}

Since we’ve overwritten the default implementation, we no longer need the sync = true option of the @Cachable annotation. Any method annotated with it will now automatically synchronize distributed instances of our application to the cache!

The Cost of Using Distributed Synchronized Caching

The advantage of this last implementation is, that we are no longer doing unneeded lookups in our cache. We look up the first time to see if the object exists. If it does – we’re done. With the previous implementation, we would do this lookup twice.

However, even with this improved implementation, there is clearly a cost to using cache synchronization, which you may need to think about. If we are in an ongoing synchronization, we load the cached resource twice: for the first time, when we check if it already exists (before cache synchronization), and for the second time, after the cache has been populated.

There are a number of scenarios in which distributed caching like this may be beneficial to your use case if:

Even if you have many distributed application instances, distributed caching may not be necessary if you balance your load, and you don’t happen to have a massive number of requests. Maybe it’s fine if you have a couple of more database hits to you.

However, as soon as the cache is already populated, the access times are going to be exactly the same as with Spring’s native caching implementation. You could only end up losing some performance if you have already low access latency to your data, even from the database, and maybe calling the cache another 1-2 times could take longer for some reason (e.g. network latency).

Another point of performance loss is when the resource is not available. One additional cache call gets done to check if the cache is being accessed, one to fill the said cache, and finally, another one to remove the cache entry. There is also a slight cost to using Redis’ PubSub system to publish and receive our messages. However, overall, these should be minimal compared to the cost of accessing your database.

One final thing to consider: this is not a technically thread-safe implementation. It should not be used to guarantee singular access to your database. This is because one thread could be a lot faster than another, catching up to the other thread before it could send the access key to signal the cache population. Or maybe you use a cluster of Redis caches. Synchronization between clusters also takes time. There could still be some rare occurrences of multiple parallel database hits.

Read more Insights on Software Product Engineering.

Newsletter Subscription