龙空技术网

Redis高并发问题这么解决,泰裤啦

java小悠 232

前言:

现时看官们对“javaredis并发”可能比较关心,大家都需要分析一些“javaredis并发”的相关内容。那么小编在网上收集了一些有关“javaredis并发””的相关资讯,希望你们能喜欢,各位老铁们一起来学习一下吧!

前言

当今互联网世界中,高并发一直是各大网站、应用面临的一大挑战。为了应对高并发的流量压力,各种技术手段被不断研究和应用。其中,Redis 作为一款高性能的内存数据库,被广泛应用于解决高并发问题。

与传统的关系型数据库不同,Redis 采用了内存存储的方式,可以实现快速的读写操作。同时,Redis 还具有丰富的数据结构和强大的缓存功能,可以大大提升系统的性能和可靠性。在处理高并发问题方面,Redis 也提供了诸如分布式锁、限流、队列等常用的解决方案,可以帮助开发者轻松应对高并发场景。

本篇文章将介绍 Redis 在处理高并发问题方面的应用,包括缓存穿透、缓存击穿和缓存雪崩问题等,并针对这些问题给出实际的解决方案(附代码),持续更新。希望本文能够对大家在解决高并发问题时提供帮助和启示。

缓存穿透

缓存穿透是指用户查询数据时,数据库和缓存中都没有数据。导致了查询请求直接绕过缓存,直接穿透到数据库。

解决方法:

缓存空值

查询Redis为null,查询数据库也为null,此时设置该key在缓存中,且值为null,过期时间为随机时间。random(10)。这样子能保证数据在这段时间暴力请求,也只会在这短暂的时间内获取null,而有另外的线程在读取数据库表,并缓存在Redis中

scss复制代码/** * 解决缓存穿透 * @return */public User getUser(String userId) {    //从缓存中获取user信息    User user = (User) redisTemplate.opsForValue().get(userId);    if(user == null) {        //如果缓存数据为空,从数据库中获取user信息        user = userService.getUserByUserId(userId);        if(user == null) {            //如果数据库中数据为空,则存入一个空值,设置短时间内过期,防止缓存穿透            redisTemplate.opsForValue().set(userId,null,5, TimeUnit.MINUTES);        }else {            //将数据写入缓存            redisTemplate.opsForValue().set(userId,user);        }    }    return user;}
布隆过滤器

可参考:…

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

优点:

空间效率高,不用像Set集合一样保存元素的值,极大地节省了内存空间。只需将插入的key通过Hash计算放到bitMap中的一个位置,在判断是否存在该key的时候,只需判断bitMap中的位置是0还是1即可,达到了Set集合判断是否存在某值的效果。查询效率高:布隆过滤器可以在非常快的时间内判断一个元素是否存在于集合中,而不需要像传统数据结构那样进行线性扫描。这对于大规模数据集和高并发查询场景尤其有用。

缺点:

布隆过滤器中的存储的key越多,误判率越高。将不存在的元素误判为存在。不能删除布隆过滤器中已存在的key

具体使用:使用Guava中的API

导入依赖

xml复制代码    <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>30.1-jre</version>    </dependency>

构造BloomFilter的最多参数的静态工厂方法是BloomFilter create(Funnel funnel, long expectedInsertions, double fpp, BloomFilter.Strategy strategy),参数如下:

funnel:主要是把任意类型的数据转化成HashCode,是一个顶层接口,有大量内置实现,见FunnelsexpectedInsertions:期望插入的元素个数fpp:猜测是False Positive Percent,误判率,小数而非百分数,默认值0.03strategy:映射策略,目前只有MURMUR128_MITZ_32和MURMUR128_MITZ_64(默认策略)

java复制代码@RestController@RequestMapping("user")public class UserController{    @Autowired    private RedisTemplate<String, Object> redisTemplate;    private static final int expectedInsertions = 10000;    private static final double fpp = 0.0444D;    private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);    @GetMapping("/user/{id}")    public User getUserById(@PathVariable Long id){        // 先从布隆过滤器中判断此id是否存在,初始化的时候需要将数据库的所有id都存到布隆过滤器,缺点不能删除已经不存在的id        if(!bloomFilter.mightContain(String.valueof(id))){            return null;        }        // 查询缓存数据        String userKey = "user_"+id;        User user = (User) redisTemplate.opsForValue().get(userKey);        if(user == null){            // 查询数据库            user = userRepository.findById(id).orElse(null);            if(user != null){                // 将查询到的数据加入缓存                redisTemplate.opsForValue().set(userKey, user, 300, TimeUnit.SECONDS);            }        }        return user;    }}
缓存击穿

缓存击穿是指一个非常热门的、但是不存在的数据被大量请求,导致请求直接落到数据库上,从而使得数据库瞬间承受巨大的压力,从而导致数据库响应变慢,甚至宕机的现象。和缓存雪崩的区别在于热点数据的量多不多。

解决方法:

缓存数据永不过期

将热门的、但是不经常更新的数据设置为永不过期,可以避免缓存击穿的风险。但是这种方法可能会导致缓存数据的时效性降低,需要根据实际情况进行权衡。

分布式锁

在加载缓存数据时,添加互斥锁可以保证只有一个请求去加载数据并更新缓存,其他请求等待缓存更新完成后再获取数据,从而避免了大量请求直接落到数据库上的情况。

scss复制代码@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RedissonClient redissonClient;public Object getData(String key) {    // 尝试从缓存中获取数据    Object value = redisTemplate.opsForValue().get(key);    if (value != null) {        return value;    }    // 如果缓存中不存在数据,获取分布式锁    RLock lock = redissonClient.getLock(key);    try {        lock.lock();        // 再次尝试从缓存中获取数据,避免其他线程在获取锁之前已经写入了缓存        value = redisTemplate.opsForValue().get(key);        if (value != null) {            return value;        }        // 如果缓存中不存在数据,从数据库中查询        value = getDataFromDatabase(key);        if (value == null) {            // 如果数据库中也不存在数据,将空对象写入缓存,并设置较短的过期时间            redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);        } else {            // 如果数据库中存在数据,将数据写入缓存,并设置较长的过期时间            redisTemplate.opsForValue().set(key, value, 5, TimeUnit.MINUTES);        }        return value;    } finally {        // 释放分布式锁        lock.unlock();    }}
缓存雪崩

缓存雪崩是指在缓存中大量的缓存数据同时过期或者失效,导致大量的请求直接落到了数据库上,使得数据库瞬间承受巨大的压力,从而导致数据库响应变慢,甚至宕机的现象。

解决方法:

过期时间随机化

将缓存数据的过期时间设置为随机值,可以避免大量缓存数据同时过期的情况发生,从而减少缓存雪崩的风险

scss复制代码/** * 解决缓存雪崩 * @return */public User getUser2(String userId) {    //从缓存中获取user信息    User user = (User) redisTemplate.opsForValue().get(userId);    if(user == null) {        //如果缓存数据为空,从数据库中获取user信息        user = lUserMapper.getUserByUserId(userId);        if(user == null) {            redisTemplate.opsForValue().set(userId,null,3, TimeUnit.MINUTES);        }else {            //设置随机过期时间,将数据写入缓存,防止缓存雪崩            long mins = random.nextInt(60) + 60;            redisTemplate.opsForValue().set(userId, user, mins, TimeUnit.MINUTES);        }    }    return user;}
分布式锁

使用分布式锁可以保证在缓存失效时,只有一个请求去加载数据并更新缓存,其他请求等待缓存更新完成后再获取数据,从而避免了大量请求同时落到数据库上的情况(同缓存穿透)

引出问题

通过上面的例子我们已经了解到了Redis在高并发状态下可能出现的问题以及解决方法,但是如果应用到实际场景中,针对每个接口都需要考虑这么处理,那代码中会充斥着大量的重复代码,那肯定是不能接受的。那有没有一种好的通用的解决方案呢?

这里就不得不提起Spring Cache,Spring Cache 是Spring 提供的一整套的缓存解决方案。 虽然它本身并没有提供缓存的实现,但是它提供了一整套的接口和代码规范、配置、注解等,这样它就可以整合各种缓存方案了,比如Redis、Ehcache,我们也就不用关心操作缓存的细节。Spring Cache怎么整合Redis,本篇文章不做介绍,大家自行上网搜索。

简单介绍一下Spring Cache的几个注解使用:

@Cacheable(key="#id") 根据id查询或者查询会启动缓存

@CachePut(key="#post.postId") 插入或者更新会启动缓存

@CacheEvict(key="#id") 删除时启动缓存

Spring Cache解决方案Spring Cache解决缓存穿透

有一个很简单的解决方案,就是缓存null值,从缓存取不到的数据,在数据库中也没有取到,直接返回空值。本身是不支持缓存null值的,需要在配置文件开启支持

ini复制代码spring.cache.redis.cache-null-values=true
Spring Cache解决缓存击穿
ini复制代码@Cacheable(cacheNames="user", sync="true")

解释:如果设置 sync 属性为 true,表示该方法的缓存操作会使用同步锁来保证线程安全,防止多个线程同时访问该方法导致缓存出现问题。如果 sync 属性为 false,则不会使用同步锁,缓存操作可能存在并发问题。通过设置 sync 属性为 true,可以保证多个线程同时访问同一个缓存方法时,只有一个线程能够执行方法,并将返回值缓存到缓存中。其他线程会等待第一个线程执行完方法后,从缓存中获取返回值。这样可以避免多个线程同时执行缓存方法,导致缓存出现问题的情况。sync = true 可以有效的避免缓存击穿的问题。

Spring Cache解决缓存雪崩

最简单的方法是过期时间加上随机值,但是很麻烦的是,我们在使用@Cacheable注解的时候,原生功能没法直接设置随机过期时间的,需要继承RedisCacheManager,重写里面的getCache方法。

可参考:Spring Boot缓存实战 Redis 设置有效时间和自动刷新缓存,时间支持在配置文件中配置

从上面可以看出Spring Cache解决Redis缓存问题还是比较麻烦的,特别是在解决缓存雪崩问题上。既然如此,我们为什么不自己实现一个属于我们自己的缓存机制,开干!!!

设计一套缓存机制

综上Redis出现的三个问题,给出综合的解决方案:

缓存空值(布隆过滤器不建议) + 分布式锁更新缓存解决Redis问题AOP + 自定义注解减少重复代码。增加复用性读取缓存型注解@MyCacheable

less复制代码@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface MyCacheable {    /**     * 缓存的Key,默认使用方法名作为Key     */    String value() default "";    /**     * 缓存的过期时间,单位为秒,默认值为60秒     */    int expireInSeconds() default 60;}
读取缓存型切面MyCacheableAspect
java复制代码@Component@Aspectpublic class MyCacheableAspect {    @Autowired    private RedisTemplate<String, Object> redisTemplate;    @Autowired    private RedissonClient redissonClient;    /**     * 定义缓存的切点,拦截所有标记了@Cached注解的方法     */    @Pointcut("@annotation(com.plus.annotation.MyCacheable)")    public void cachedPointcut() {    }    private static final int expectedInsertions = 10000;    private static final double fpp = 0.0444D;    private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);    private static final String prefix = "lock-";    /**     * 在方法执行前尝试从缓存中获取数据,如果缓存中存在数据,直接返回     */    @Around("cachedPointcut()")    public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {        // 获取注解信息        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();        Method method = methodSignature.getMethod();        Cached cached = method.getAnnotation(MyCacheable.class);        String key = StringUtils.isEmpty(cached.value()) ? method.getName() : cached.value();        int expireInSeconds = cached.expireInSeconds();        // 先从布隆过滤器中判断此id是否存在,初始化的时候需要将数据库的所有id都存到布隆过滤器。这么多个数据表,id肯定有重复的。单独存储id是不行的。那就加上        // 前缀,例如user-id。初始化的时候就要遍历所有需要缓存数据的表,将该表的id都存到布隆过滤器,数据量很大,这也会导致布隆过滤器误判率增加。且后续这些        // 表新增数据都要用将id也要存到布隆过滤器。缺点不能删除布隆过滤器已经不需要的id。布隆过滤器按我的见解是真不好用,虽然在一些特殊场景好用,但不包括此场景        // 'mightContain(T)' is declared in unstable class 'com.google.common.hash.BloomFilter' marked with @Beta 说明这个方法是不稳定的,有可能误判        //if (key.contains("#id") && !bloomFilter.mightContain(key)) {        //    return null;        //}        // 尝试从缓存中获取数据        Object value = redisTemplate.opsForValue().get(key);        if (value != null) {            if (value instanceof NullValue) {                // 如果缓存中存在空对象,返回null                return null;            }            return value;        }        // 如果缓存中不存在数据,获取分布式锁        RLock lock = redissonClient.getLock(prefix + key);        try {            lock.lock();            // 再次尝试从缓存中获取数据,避免其他线程在获取锁之前已经写入了缓存            value = redisTemplate.opsForValue().get(key);            if (value != null) {                if (value instanceof NullValue) {                    // 如果缓存中存在空对象,返回null                    return null;                }                return value;            }            // 如果缓存中不存在数据,执行方法获取数据            value = joinPoint.proceed();            if (value == null) {                // 如果数据源中也不存在数据,将空对象写入缓存,并设置较短的过期时间。防止缓存穿透,算是布隆过滤器的兜底                redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);            } else {                // 如果数据源中存在数据,将数据写入缓存,随机设置过期时间,避免缓存同时失效导致缓存雪崩                // 随机时间算法:以正常缓存时间为基准,取十分之一的范围内生成随机数                int seed = expireInSeconds / 10 == 0 ? expireInSeconds : expireInSeconds / 10;                int randomTime = new Random().nextInt(seed);                redisTemplate.opsForValue().set(key, value, expireInSeconds + randomTime, TimeUnit.SECONDS);            }            return value;        } finally {            // 释放分布式锁            lock.unlock();        }    }    /**     * 缓存空对象类,受不了代码规范插件报的null值警告才增加的。注意RedisTemplate<String, String> redisTemplate不能缓存null值,需要RedisTemplate<String, Object> redisTemplate     */    private static class NullValue implements Serializable {        private static final long serialVersionUID = 1L;        /**         * 单例模式         */        private static final NullValue INSTANCE = new NullValue();        private Object readResolve() {            return INSTANCE;        }    }}
更新缓存型注解@MyCacheEvict
less复制代码@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface MyCacheEvict {    /**     * 缓存的Key     */    String value();}
更新缓存型切面MyCacheEvictAspect
less复制代码@Aspect@Componentpublic class MyCacheEvictAspect {    /**     * 定义缓存的切点,拦截所有标记了@MyCacheEvict注解的方法     */    @Pointcut("@annotation(com.plus.annotation.MyCacheEvict)")    public void cachedPointcut() {    }    @Autowired    private RedisTemplate<String,Object> redisTemplate;    @Around("cachedPointcut()")    public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {        // 获取注解信息        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();        Method method = methodSignature.getMethod();        MyCacheEvict cacheEvict = method.getAnnotation(MyCacheEvict.class);        String key = cacheEvict.value();        // 执行方法        Object result = joinPoint.proceed();        redisTemplate.delete(key);        return result;    }}
具体使用
less复制代码@GetMapping("getAllUser")@MyCacheable(value = "getAllUser")public R<List<User>> getAllUser(){    return R.data(userService.list(new UserDTO()));}@GetMapping("getUserById")@MyCacheable(value = "id")public R<User> getUserById(int id){    return R.data(userService.getUserById(id));}@PostMapping("/save")@MyCacheEvict("getAllUser")public R save(@RequestBody @Validated User user) {    return R.data(userService.save(user));}

以上方法基本能解决大部分场景下的缓存问题,大家有需求可以自行拓展,例如支持多种格式的key处理。对此有疑问的,希望大家多多指导!!!

原文链接:

标签: #javaredis并发