In this article we will cover multiple approaches to implement a synchronized Redis Cache across distributed Spring Boot application instances.
Summary
Redis Caching in Spring Boot Applications
In our previous article ‘Redis Caching in Spring Boot Applications’, we talked about using Redis as a cache in Spring Boot applications. Let’s have a brief summary at what is and isn’t possible using the native Redis Cache implementation.
We’ve created a simple Angular app that allows us to create multiple parallel requests to multiple instances of our spring boot application.
We create one GET-request every 0.5 seconds. Each instance of our application also gets hit after 500 milliseconds to demonstrate the effects of cache synchronization in distributed applications. There is a 2.5 second sleep in the service that resolves our GET-request. This makes it possible to measure the difference in performance by simulating an extreme scenario. We are also tracking each individual response time and aggregating them.
Without any Caching, the total response time of each server is at it’s maximum: 6 * 2.5 = 15 seconds.
By simple enabling Redis Caching, we already significantly improve our response times, since the last request will finish earlier – it was sent after the cache is populated.
Can we go any better? Of course ! By enabling Spring’s native cache synchronization, we can get response times down by over 50%.
However, we do see an issue here: clearly the cache is not being synchronized across multiple instances. While all requests of Server 1
are completed because its first request is done, Server 2
is still busy for some time, because its’ first request was started later. All consecutive requests on that server are synchronized only to its’ local application instance.
Spring Framework’s native caching only stores the information necessary for the synchronization of requests on an application-wide level. This is why separate application instances aren’t synchronized. To create synchronized caching across multiple instances (we will call this distributed caching going forward), we would need to store any information in a way in which all applications can access it.
Luckily, all our application instances have a common denominator: the cache itself. Via Spring’s cache manager, all our application instances ultimately use the same cache, based on the configurations that were done. The easiest approach would be to implement synchronization using the Redis Cache as a means of informing each application that a resource is being accessed, and that it should wait for the cache to be populated.
Using Spring AOP and Custom Annotations
One simple way of implementing our distributed caching would be to use Spring AOP (Aspect Oriented Programming) to extend the native Redis Cache functionality. We will achieve this by creating a custom annotation, which can then be placed around functions that require distributed caching.
What is AOP?
In short, AOP is a programming paradigm that allows us to add additional behaviour around existing code, without the need to modify it. In this regard, it could be viewed as another form of the decorator pattern. Learn more about Spring AOP in this article.
Custom Annotation
First, let’s create a custom annotation:
// imports skipped for readability
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedCaching {
}
We can now annotate our service functions which need distributed caching like this:
...
@Cacheable(value = "user", key = "#username")
@DistributedCaching
public User getUser(String username) {
...
}
Currently, nothing additional is happening yet. For that, we will define our Spring Aspect, which should execute our distributed caching code each of our annotated functions.
@Aspect
@Order(1)
public class DistributedCacheAspect {
private CacheManager cacheManager;
public DistributedCacheAspect(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
...
}
We’ve called our class DistributedCacheAspect
and annotated it as @Aspect
. This makes sure that Spring Boot will set this class up as a proper Aspect. We also have added an @Order(1)
annotation. This is because we want our own aspect to be called before the @Cachable annotation, since we will add additional caching features to it.
We have also injected the CacheManager
bean via constructor injection. We need the CacheManager
to access our Redis Cache within this class. Since all of our Spring Boot instances will use the same cache configuration, this is also the part of our code which will connect all instances and manage synchronization.
Distributed Caching
Next, we’ll create a function which executes our code around already existing code. The basic algorithm behind our distributed caching will be:
- If the cached object exists: do nothing. The
@Cachable
annotation will execute after our code and return the cached object. - Otherwise, if a special key (I’ll call this access key going forward) exists in our cache for the resource that is being accessed: wait for this access key to be deleted from the cache. This access key represents that another thread is already loading the resource from the database. The cache should be populated after some time has passed. When the access key has been deleted, it means the resource has been loaded into the cache. In this case, we just execute the rest of the code, and just like before the
@Cachable
annotation will return the cached object. - If the object is not cached and there is no access key:
- put the access key into the cache because. This signals to all other future threads that the cache will soon be populated.
- execute the function. This will result in an actual database request to load the resource. Afterwards, Spring will cache the object that has been loaded.
- after execution, we want to remove the access key
Let’s start with implementing the main functionality of our DistributedCacheAspect
class:
@Around("@annotation(DistributedCaching)")
public Object distributedCaching(ProceedingJoinPoint joinPoint) throws Throwable {
...
}
By annotating the distributedCaching
function with @Around("@annotation(DistributedCaching)")
we are able to define around which functions our code shall be injected. In this case, we want this piece of code to execute around every method, which has the @DistributedCaching
annotation.
The ProceedingJoinPoint
class enables us to get information about the function, such as the method signature, and run the actual method whenever we want to.
First, let’s think about how we will define our access key. The easiest way would be to generate a unique ID from the method we’re calling.
String accessKey = joinPoint
.getSignature()
.toLongString() + Arrays.toString(joinPoint.getArgs());
Here, we simply generate a unique key by taking the signature of the annotated method and appending the method arguments to it. The method signature always stays the same, while the method arguments are the resolved arguments of the method. Instead of the name of the variable (e.g. „username”), we would get the actual variable for the specific function call („JohnDoe”).
Next we’ll need to actually check if the object which will be cached is already existing. To do this, we’ll write a helper function to parse the actual object’s key out of the method:
import org.springframework.expression.*;
...
private final ExpressionParser parser = new SpelExpressionParser();
private final LocalVariableTableParameterNameDiscoverer discoverer
= new LocalVariableTableParameterNameDiscoverer();
private <T> T parseSpel(Method method, Object[] arguments, String spel) {
String[] params = discoverer.getParameterNames(method);
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], arguments[len]);
}
Expression expression = parser.parseExpression(spel);
return expression.getValue(context, (Class<T>) String.class);
}
First, we’ve added the ExpressionParser
and LocalVariableTableParameterNameDiscoverer
variables. We use the discoverer
to get the parameter names from our method. We construct a new EvaluationContext
and set it’s variables to be the method’s arguments. We parse the spel
expression of the method. This is the key = "#user.username"
part in the @Cachable
annotation.
Finally, we resolve the expression for the given context. If your spel
was set to #user.username
, this function would look at the method’s arguments, find the corresponding object (in this case the one which is named „user”) and resolve the actual username
variable of the user object.
Now, we can check if the object is already cached like such:
...
String key = parseSpel(method, joinPoint.getArgs(), myAnnotation.key());
String actualCachedValueCacheName = myAnnotation.value()[0];
if (cacheManager.getCache(actualCachedValueCacheName).get(key) != null) {
return joinPoint.proceed();
}
Essentially, we’ve just filtered out the case in which the object is already cached. In this case, we just let the method proceed and Spring does its magic, returning the cached object.
Finally, the rest of the distributedCaching
function’s code:
if (cacheManager.getCache("ACCESS_CACHE").get(accessKey) != null) {
waitForSynchronization("ACCESS_CACHE", accessKey);
return joinPoint.proceed();
} else {
cacheManager.getCache("ACCESS_CACHE").put(accessKey, true);
Object proceed = joinPoint.proceed();
cacheManager.getCache(DISTRIBUTED_CACHE_NAME).evict(accessKey);
return proceed;
}
If the access key is present (meaning the value we get is not null), we call the waitForSynchronization
function to lock the thread until the cache is populated.
After that, we simply run the method. Otherwise, if the access key isn’t present, that means the current thread will be accessing the database and populating the cache. We simply put the access key into our cache, proceed with the method, and delete the key after the method is finished.
This is a perfect example of how Spring AOP helps us extend code generically.
‘Wait For Synchronization’ Method
Let’s look at the waitForSynchronization
method:
private void waitForSynchronization(String accessKey) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
List<Future<Boolean>> result = executor.invokeAll(
List.of(new CacheManagerTask(
this.cacheManager,
"ACCESS_CACHE",
accessKey
)),
10,
TimeUnit.SECONDS
);
result.get(0).get();
} catch (InterruptedException | ExecutionException e) {
// process what happens if the cache wasn't
// populated after the waiting period
}
}
Here, we create a task that runs our blocking code and periodically checks the cache to see if the cache has been populated. This returns a Future
object, where calling the get
method will block our code until the task is either finished, or interrupted because it took too long. You may also want to handle this situation accordingly.
public class CacheManagerTask implements Callable<Void> {
private final CacheManager cacheManager;
private final String accessCache;
private final String accessKey;
public CacheManagerTask(
CacheManager cacheManager,
String accessCache,
String accessKey
) {
this.cacheManager = cacheManager;
this.accessCache = accessCache;
this.accessKey = accessKey;
}
@Override
public Void call() throws InterruptedException {
while (cacheManager.getCache(accessCache).get(accessKey) != null) {
Thread.sleep(10);
}
return null;
}
}
With this implementation, we would actively poll the cache to check if the population has been completed. This is definitely suboptimal.
Luckily, Redis itself offers a nice way to solve this issue: Redis Pub/Sub. This allows us to subscribe to and publish about whatever we want to. We can use this to implement a non-polling approach:
private boolean lockedCache = false;
public boolean isLocked() {
return lockedCache;
}
public void setLocked(boolean locked) {
this.lockedCache = locked;
}
Listener, Publisher and Subscriber
First, we define a variable which will act as a lock to the thread – with getter and setter to enable access within our CacheManagerTask
. Next, we will create two classes: a RedisMessageListener
which implements Redis’ message listener, and also a RedisMessagePublisher
to publish messages over Redis directly.
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class RedisMessageListener implements MessageListener {
private DistributedCacheAspect distributedCacheAspect;
public RedisMessageListener(
DistributedCacheAspect distributedCacheAspect
) {
this.topic = topic;
this.distributedCacheAspect = distributedCacheAspect;
}
@Override
public void onMessage(Message message, byte[] pattern) {
//unlock the thread!
distributedCacheAspect.setLocked(false);
}
}
The listener will unlock the thread if a message for the specific topic is received. In this case, the topic will be our unique accessKey
.
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.Topic;
public class RedisMessagePublisher {
private final RedisTemplate<String, Object> redisTemplate;
public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate
) {
this.redisTemplate = redisTemplate;
}
public void publish(Topic topic, String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
The publisher will allow us to send a message for a given topic. For that reason, we need to configure additional Beans. Remember to import the redis.clients.jedis
dependency in your pom.xml
:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<type>jar</type>
</dependency>
public class RedisListenerConfiguration {
@Bean
public RedisMessageListenerContainer listenerContainer(
RedisConnectionFactory redisConnection
) {
RedisMessageListenerContainer container =
new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnection);
return container;
}
@Bean
JedisConnectionFactory jedisConnectionFactory() {
return new JedisConnectionFactory();
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
final RedisTemplate<String, Object> template =
new RedisTemplate<String, Object>();
template.setConnectionFactory(jedisConnectionFactory());
template.setValueSerializer(
new GenericToStringSerializer<Object>(Object.class)
);
return template;
}
@Bean
RedisMessagePublisher redisPublisher() {
return new RedisMessagePublisher(redisTemplate());
}
}
The RedisMessageListenerContainer
allows us to add our own listener to subscribe to topics in our aspect class. We are also defining a JedisConnectionFactory
and RedisTemplate
Bean to define how the connection to Redis will be implemented. While we are at it, let’s also add our RedisMessagePublisher
as a Bean – this can be used across all instances to publish our messages.
In our aspect class, we can now add these via dependency injection:
private final CacheManager cacheManager;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisMessagePublisher redisPublisher;
public DistributedCacheAspect(
CacheManager cacheManager,
RedisMessageListenerContainer redisMessageListenerContainer,
MessagePublisher redisPublisher
) {
this.cacheManager = cacheManager;
this.redisMessageListenerContainer = redisMessageListenerContainer;
this.redisPublisher = redisPublisher;
}
Now we can use the message publisher and subscriber to implement a non polling approach:
Topic topic = new ChannelTopic(accessKey);
if (cacheManager.getCache("ACCESS_CACHE").get(accessKey) != null) {
lockedCache = true;
RedisMessageListener redisMessageListener =
new RedisMessageListener(accessKey, this);
redisMessageListenerContainer
.addMessageListener(redisMessageListener, topic);
waitForSynchronization();
redisMessageListenerContainer
.removeMessageListener(redisMessageListener, topic);
return joinPoint.proceed();
} else {
cacheManager.getCache("ACCESS_CACHE").put(accessKey, true);
Object proceed = joinPoint.proceed();
cacheManager.getCache("ACCESS_CACHE").evict(accessKey);
redisPublisher.publish(topic, "DONE");
return proceed;
}
Before waiting for synchronization, we are subscribing to the topic of whatever our access key is – and unsubscribe after we’re done. Also, when we’re done evicting the access key, we’re publishing a message, noticing that we are done, to the topic of our access key. This way, the thread can wait without polling Redis by just locking itself.
private void waitForSynchronization() {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
List<Future<Void>> result = executor.invokeAll(List.of(
new CacheManagerTask(this)),
10,
TimeUnit.SECONDS
); // Timeout of 10 seconds.
result.get(0).get();
} catch (InterruptedException | ExecutionException e) {
...
}
}
The waitForSynchronization
method now creates a CacheManagerTask
of this
, to be able to check if the thread is still locked.
public class CacheManagerTask implements Callable<Void> {
private final DistributedCacheAspect distributedCacheAspect;
public CacheManagerTask(DistributedCacheAspect distributedCacheAspect) {
this.distributedCacheAspect = distributedCacheAspect;
}
@Override
public Void call() throws InterruptedException {
while (distributedCacheAspect.isLocked()) {
Thread.sleep(10);
}
return null;
}
}
Now, we’re simply waiting actively on the thread to be unlocked from outside. This happens as soon as our RedisMessageListener
receives a message for our given access key as the topic. We are no longer actively polling Redis.
Now, this is what our Angular-app shows when we set it up just like before:
Notice how as soon as the first request is finished, every other request across all our deployed servers also finishes. We only have one database call being made. Each server subscribes to the cached value’s access key, and when the first request is done loading from the database, it publishes according to that topic!
How computationally expensive is this compared to just using Spring’s native caching? We are creating one request for the cached object, one for the access key, and then there is the cost of sending and subscribing to messages.
Depending on your use case, it might be worth using this implementation. There is still room for improvement though: Spring already parses the SpEL expression and checks, if the object is cached.
In the next chapter, we’ll look at directly extending this native implementation to optimize the amount of requests send to Redis!
Extending RedisCache & RedisCache Manager
The spring-boot-starter-data-redis
dependency delivers a RedisCache
as well as RedisCacheManager
class. These implement the default functionality of Spring’s native caching. If we extend these classes and overwrite specific code regarding lookup and putting of objects into the cache, we can integrate our previous distributed caching directly into the caching classes and optimize our code further.
@Configurable
public class DistributedRedisCache extends RedisCache {
private final String distributedCacheName = "ACCESS_CACHE";
// ... instance variables
protected DistributedRedisCache(
String name, RedisCacheWriter cacheWriter,
RedisCacheConfiguration cacheConfig,
RedisMessageListenerContainer redisMessageListenerContainer,
MessagePublisher messagePublisher
) {
super(name, cacheWriter, cacheConfig);
// set instance variables
}
...
private void waitForSynchronization() {
// ...just like before, except we lock *this* thread
}
...
}
The constructor takes every argument of the parent class RedisCache
and calls the super constructor. This sets up our Redis Cache to work the exact same way as the native implementation.
Additionally, we save these arguments in our own class, because the RedisCache
hasn’t been made with extension in mind and most of its variables and even functions are private. We also add our own components: RedisMessagePublisher
and RedisMessageListenerContainer
. The waitForSynchronization
method also is set up just like before, except it will lock the DistributedRedisCache
instance.
Lookup and Put Methods
Next, we overwrite the lookup
and put
methods. The core functionality – looking up and putting the actual values – is the same as in the parent class. We then add our own code to it.
...
@Override
protected Object lookup(Object key) {
String accessKey = distributedCacheName + convertKey(key);
byte[] value = cacheWriter.get(name, createAndConvertCacheKey(key));
if (value != null) {
return deserializeCacheValue(value);
}
byte[] distributedAccess = cacheWriter
.get(distributedCacheName, createAndConvertCacheKey(accessKey));
if (distributedAccess != null) {
lockedCache = true;
Topic topic = new ChannelTopic(accessKey);
RedisMessageListener redisMessageListener =
new RedisMessageListener(accessKey, this);
redisMessageListenerContainer
.addMessageListener(redisMessageListener, topic);
waitForSynchronization();
redisMessageListenerContainer
.removeMessageListener(redisMessageListener, topic);
return deserializeCacheValue(cacheWriter.get(
name,
createAndConvertCacheKey(key)
));
}
cacheWriter.put(
distributedCacheName,
createAndConvertCacheKey(accessKey),
serializeCacheValue(true),
cacheConfig.getTtl()
);
return null;
}
protected byte[] createAndConvertCacheKey(Object key) {
return serializeCacheKey(createCacheKey(key));
}
...
In the lookup
method, we first check if the object is already cached. If it is, there is no reason to look any further: we simply return it in it’s deserialized form. Otherwise, we load the accessKey
from the cache: if it exists, we do our synchronization – just like before. Otherwise, we put our access key into the cache, before returning null. Returning null causes the actual method that is annotated to be accessed. The database call commences.
...
@Override
public void put(Object key, @Nullable Object value) {
Object cacheValue = preProcessCacheValue(value);
String accessKey = distributedCacheName + convertKey(key);
if (!isAllowNullValues() && cacheValue == null) {
// exception handling
}
//noinspection ConstantConditions
cacheWriter.put(
name,
createAndConvertCacheKey(key),
serializeCacheValue(cacheValue),
cacheConfig.getTtl()
);
cacheWriter.remove(
distributedCacheName,
createAndConvertCacheKey(accessKey)
);
Topic topic = new ChannelTopic(accessKey);
redisPublisher.publish(topic, "DONE");
}
In the put
method, we simply remove the access key and publish a new message with the access key as the topic.
To keep as much functionality as possible, it’s reasonable to also extend the RedisCacheManager
, since it supplies a builder
with many cache configuration options.
public class DistributedRedisCacheManager extends RedisCacheManager {
private final RedisCacheWriter cacheWriter;
private final RedisCacheConfiguration defaultCacheConfiguration;
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
@Autowired
private MessagePublisher messagePublisher;
public DistributedRedisCacheManager(
RedisCacheWriter cacheWriter,
RedisCacheConfiguration defaultCacheConfiguration,
Map<String, RedisCacheConfiguration> initialCacheConfigurations,
boolean allowInFlightCacheCreation
) {
super(
cacheWriter,
defaultCacheConfiguration,
initialCacheConfigurations,
allowInFlightCacheCreation
);
this.cacheWriter = cacheWriter;
this.defaultCacheConfiguration = defaultCacheConfiguration;
}
@Override
protected DistributedRedisCache createRedisCache(
String name,
@Nullable RedisCacheConfiguration cacheConfig
) {
return new DistributedRedisCache(
name,
cacheWriter,
cacheConfig != null
? cacheConfig
: defaultCacheConfiguration,
redisMessageListenerContainer,
messagePublisher
);
}
...
}
Just like before, we add our own components, call the super constructor in our own constructor and override the createRedisCache
function to return our new DistributedRedisCache
class. I’ve also copied over the entire RedisCacheManagerBuilder
inside of the original RedisCacheManager
, but replaced the final build
method: it also returns our own DistributedRedisCache
.
Now, to use this implementation, all we need to do is create a bean for our DistributedRedisCacheManager
in a configuration class of your choice:
@Bean
public DistributedRedisCacheManager createDistributedRedisCache(
RedisConnectionFactory redisConnectionFactory
) {
return DistributedRedisCacheManager.RedisCacheManagerBuilder
.builder(redisConnectionFactory)
.withCacheConfiguration("userCache",
RedisCacheConfiguration
.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
).build();
}
Since we’ve overwritten the default implementation, we no longer need the sync = true
option of the @Cachable
annotation. Any method annotated with it will now automatically synchronize distributed instances of our application to the cache!
The Cost of Using Distributed Synchronized Caching
The advantage of this last implementation is, that we are no longer doing unneeded lookups in our cache. We look up the first time to see if the object exists. If it does – we’re done. With the previous implementation, we would do this lookup twice.
However, even with this improved implementation, there is clearly a cost to using cache synchronization, which you may need to think about. If we are in an ongoing synchronization, we load the cached resource twice: for the first time, when we check if it already exists (before cache synchronization), and for the second time, after the cache has been populated.
There are a number of scenarios in which distributed caching like this may be beneficial to your use case if:
- There are many distributed application instances – the more you have, the more you’ll benefit from synchronizing them on a distributive basis like this.
- Your distributed applications will also get hit often by multiple parallel requests, causing unnecessary database access.
- Your database access times are larger or you require processing in between – the longer the access times, the more you’ll benefit.
Even if you have many distributed application instances, distributed caching may not be necessary if you balance your load, and you don’t happen to have a massive number of requests. Maybe it’s fine if you have a couple of more database hits to you.
However, as soon as the cache is already populated, the access times are going to be exactly the same as with Spring’s native caching implementation. You could only end up losing some performance if you have already low access latency to your data, even from the database, and maybe calling the cache another 1-2 times could take longer for some reason (e.g. network latency).
Another point of performance loss is when the resource is not available. One additional cache call gets done to check if the cache is being accessed, one to fill the said cache, and finally, another one to remove the cache entry. There is also a slight cost to using Redis’ PubSub system to publish and receive our messages. However, overall, these should be minimal compared to the cost of accessing your database.
One final thing to consider: this is not a technically thread-safe implementation. It should not be used to guarantee singular access to your database. This is because one thread could be a lot faster than another, catching up to the other thread before it could send the access key to signal the cache population. Or maybe you use a cluster of Redis caches. Synchronization between clusters also takes time. There could still be some rare occurrences of multiple parallel database hits.
Read more Insights on Software Product Engineering.