龙空技术网

【redis实战一】扩展SpringCache解决缓存击穿,穿透,雪崩

IT动力 455

前言:

此刻各位老铁们对“重定向次数过多清除也没用”大致比较关切,你们都需要分析一些“重定向次数过多清除也没用”的相关资讯。那么小编在网摘上网罗了一些有关“重定向次数过多清除也没用””的相关资讯,希望大家能喜欢,姐妹们快快来了解一下吧!

1、问题描述

我们在使用SpringCache的@Cacheable注解时,发现并没有设置过期时间这个功能。

@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface Cacheable {    // cacheNames的别名。与cacheNames二选一即可    @AliasFor("cacheNames")    String[] value() default {};    // 也就是我们存储到Redis的key的前缀部分。比如user:, 后面部分来源于参数    @AliasFor("value")    String[] cacheNames() default {};    // 同一个缓存名称的不同参数,key是显式指定。如#id, 表示去参数种的id字段。支持SpEL表达式    String key() default "";    // 同一个缓存名称的不同参数,keyGenerator是因为无法直接取到参数,参数需要经过一系列较为复杂的处理才能获得。通过KeyGenerator生成    String keyGenerator() default "";    // 指定缓存管理器,通常不会指定,使用默认的即可    String cacheManager() default "";    // 指定缓存解析器    String cacheResolver() default "";    // 存入缓存的条件,支持SpEL表达式,结果为true才会存入缓存    String condition() default "";    // 不存入缓存的条件,支持SpEL表达式,结果为true则不会存入缓存    String unless() default "";    // 是否同步回填缓存,并发访问@Cacheable时,因为线程安全问题,缓存还没来得及写入Redis, 就已经开始新的访问了,从而导致数据库被N次访问。    boolean sync() default false;}

由上面的接口描述可以知道,我们没有设置缓存过期时间的地方。

而我们的缓存基本都是存在过期时间的,否则Redis的数据会越堆越多,并且没有释放,而最终的结果将会是Redis被撑爆。

因此,我们需要让SpringCache支持过期时间的设置,我们可以把这个过期时间放在cacheNames的值后面,并且通过#号设置, 形如下面这种写法

@Cacheable(cacheNames = "test2#3", key = "#id")public TestEntity getById2(Long id){    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);    log.info("模拟查询数据库:{}", testEntity);    return testEntity;}

我们将过期时间跟在value这个key的后面,通过#号分割,这样可以方便我们将过期时间分散设置。

2、为什么要将缓存过期时间分散设置?

其实为什么要将缓存过期时间分散设置,就是因为缓存过期时间设置相同存在一个巨大的问题:“缓存集中失效,导致缓存雪崩”。

因为大量的缓存key在同一时间失效,导致大量的请求直接穿透缓存(缓存穿透),命中到数据库。

从而把数据库瞬间压垮,甚至导致服务宕机,从而导致服务雪崩。所以将缓存时间分散设置能够非常有效的避免缓存雪崩。

当然,缓存雪崩的解决方案还有另外一种加锁回填缓存方式,我们会在后面提到并给出实际操作代码,但两者一起用丝毫不会冲突。

3、扩展SpringCache使其支持过期时间设置

我们不仅仅要让SpringCache支持分散时间,后续还会将其封装为一个spring-boot-stater,成为一个公共的模块,让你在以后使用时能够直接复用。

3.1、思路说明

主要就是要重写CacheManager的逻辑,并且重写CacheManager的createRedisCache方法,将@Cacheble的cacheNames或value的值根据#号进行拆分。

然后使用RedisCacheConfiguration的entryTtl设置过期时间即可。

但是我们为了后面封装为stater作为基础,我们这里会自定义RedisConnectionFactory,RedisTemplate和CacheManager.

3.2 自定义Redis属性配置文件

/** * 自定义Redis属性文件 */@ConfigurationProperties(prefix = "redis", ignoreInvalidFields = true)  // prefix:属性配置前缀, ignoreInvalidFields: 忽略字段的校验@Data   // 简化Setter和Getterpublic class CustomRedisProperties {    /**     * 是否开启Redis的配置     */    private boolean enable = false;    /**     * Redis连接地址,IP:PORT,IP:PORT     */    private String host = "";    /**     * Redis连接密码     */    private String password = "";    /**     * 如果是单机版,可以选择哪个数据库     */    private Integer database = 0;    /**     * 最大重定向次数,可选配置     */    private Integer maxRedirects = null;    /**     * SpringCache前缀, 方便key管理     */    private String keyPrefix = "myrds";    /**jedis连接池配置*/    private CustomRedisPoolConfig<?> pool = new CustomRedisPoolConfig<>();    @Data    @EqualsAndHashCode(callSuper = false)    public static class CustomRedisPoolConfig<T> extends GenericObjectPoolConfig<T> {        // 定义扩展属性    }}

我们使用自定义的Redis属性配置文件,同时支持单机和集群两种模式,并且延用了原来的连接池配置。

3.3 自定义Redis配置抽象类

抽象类定义了一些RedisConnectionFactory,RedisTemplate等构建对象的构建方法。

/** * Redis配置抽象类 */@Slf4jpublic abstract class AbstractRedisConfig {    /**     * Redis是否是集群的标识     */    protected AtomicBoolean redisCluster = new AtomicBoolean(false);    /**     * 根据Redis属性文件构造RedisConnectionFactory     * @param redisProperties Redis属性文件     * @return RedisConnectionFactory     */    protected RedisConnectionFactory getRedisConnectionFactory(CustomRedisProperties redisProperties) {        if (StringUtils.isBlank(redisProperties.getHost())){            throw new RuntimeException("redis host is not null");        }        // 根据逗号切割host列表        Set<String> hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());        if (CollectionUtils.isEmpty(hosts)){            throw new RuntimeException("redis host address cannot be empty");        }        // 只有一个host, 表示是单机host        if (hosts.size() == 1){            String hostPort = hosts.stream().findFirst().get();            String[] hostArr = hostStr2Arr(hostPort);            return getSingleConnectionFactory(redisProperties, hostArr);        }        // 集群处理        RedisClusterConfiguration configuration = new RedisClusterConfiguration();        List<RedisNode> listNodes = new ArrayList<>();        for (String host : hosts) {            String[] split = hostStr2Arr(host);            RedisNode redisNode = new RedisClusterNode(split[0], Integer.parseInt(split[1]));            listNodes.add(redisNode);        }        return getClusterConnectionFactory(redisProperties, configuration, listNodes);    }    /**     * 构造单机版Redis连接工厂     * @param redisProperties Redis属性文件     * @param hostArr 连接地址列表,单机传一个     * @return ConnectionFactory     */    protected LettuceConnectionFactory getSingleConnectionFactory(CustomRedisProperties redisProperties, String[] hostArr) {        redisCluster.set(false);        // 构造单机版Redis连接工厂        RedisStandaloneConfiguration singleConf = new RedisStandaloneConfiguration();        singleConf.setHostName(hostArr[0]);        singleConf.setPassword(RedisPassword.of(redisProperties.getPassword()));        singleConf.setPort(Integer.parseInt(hostArr[1]));        singleConf.setDatabase(redisProperties.getDatabase());        // 创建连接池        final LettucePoolingClientConfiguration configuration = LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();        LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(singleConf, configuration);        log.info("==============创建单机版Redis连接工厂成功==================");        log.info("=============={}==================", redisProperties.getHost());        return connectionFactory;    }    /**     * 构造集群版Redis连接工厂     * @param redisProperties Redis属性文件     * @param listNodes 集群节点列表     * @return ConnectionFactory     */    protected LettuceConnectionFactory getClusterConnectionFactory(CustomRedisProperties redisProperties, RedisClusterConfiguration configuration, List<RedisNode> listNodes) {        redisCluster.set(true);        // 设置最大重定向次数        configuration.setMaxRedirects(redisProperties.getMaxRedirects());        configuration.setClusterNodes(listNodes);        // 设置密码        configuration.setPassword(RedisPassword.of(redisProperties.getPassword()));        // 构造集群版Redis连接工厂        final LettucePoolingClientConfiguration lettucePoolingClientConfiguration =                LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();        LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, lettucePoolingClientConfiguration);        log.info("==============创建集群版Redis连接工厂成功==================");        log.info("=============={}==================", redisProperties.getHost());        return connectionFactory;    }    /**     * host字符串转换为数组     * @param hostPort Redis连接地址和端口     * @return hostPort数组     */    protected String[] hostStr2Arr(String hostPort) {        String[] hostArr = hostPort.split(":");        if (hostArr.length != 2) {            throw new RuntimeException("host or port err");        }        return hostArr;    }    protected RedisTemplate<String, Object> buildRestTemplate(RedisConnectionFactory redisConnectionFactory) {        RedisTemplate<String, Object> template = new RedisTemplate<>();        // 设置连接工厂        template.setConnectionFactory(redisConnectionFactory);        // String序列化        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();        Jackson2JsonRedisSerializer<?> jacksonSerializer = buildRedisJackson();        // key and hashKey 序列化方式        template.setKeySerializer(stringRedisSerializer);        template.setHashKeySerializer(stringRedisSerializer);        // value serializer        template.setDefaultSerializer(jacksonSerializer);        template.setValueSerializer(jacksonSerializer);        template.setHashValueSerializer(jacksonSerializer);        template.afterPropertiesSet();        return template;    }    /**     * 构建JSON序列化器     */    protected Jackson2JsonRedisSerializer<?> buildRedisJackson() {        // JSON序列化        Jackson2JsonRedisSerializer<?> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);        ObjectMapper om = new ObjectMapper();        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        // enableDefaultTyping 方法已经过时,使用新的方法activateDefaultTyping        // objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);        jacksonSerializer.setObjectMapper(om);        return jacksonSerializer;    }    /**     * 获取Redis集群状态 是单机模式还是集群模式     */    public AtomicBoolean getRedisCluster() {        return redisCluster;    }}

可以看到,我们将一些公共的东西提取了出来,并且将每个方法声明为protect, 方便子类重写各自的方法。

3.4 自定义Redis配置

/** * 自定义Redis配置文件 */@Configuration  // 标识为一个配置项,注入Spring容器@EnableConfigurationProperties(CustomRedisProperties.class) // 启动Redis配置文件@ConditionalOnProperty(value = "redis.enable", havingValue = "true")@EnableCaching@Slf4jpublic class CustomRedisConfig extends AbstractRedisConfig{    @Resource    private CustomRedisProperties redisProperties;    /**     * 注册RedisConnectionFactory     */    @Bean    @ConditionalOnMissingBean(RedisConnectionFactory.class)    public RedisConnectionFactory redisConnectionFactory() {        return getRedisConnectionFactory(redisProperties);    }    /**     * 自定义注册RedisTemplate     * @param redisConnectionFactory 自定义的Redis连接工厂     * @return RedisTemplate     */    @Bean    @ConditionalOnMissingBean(RedisTemplate.class)    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {        return buildRestTemplate(redisConnectionFactory);    }    /**     * 自定义cacheManager     */    @Bean    @ConditionalOnMissingBean(RedisCacheManager.class)    public RedisCacheManager cacheManager() {        // String序列化        RedisSerializer<String> strSerializer = new StringRedisSerializer();        // json序列化        Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();        // set RedisCacheConfiguration        RedisCacheConfiguration config =                RedisCacheConfiguration.defaultCacheConfig()                        // 增加自定义的前缀                        .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")                        // 设置key value序列化器                        .serializeKeysWith(RedisSerializationContext.SerializationPair                                .fromSerializer(strSerializer))                        .serializeValuesWith(RedisSerializationContext.SerializationPair                                .fromSerializer(jsonRedisSerializer))                        // 禁用缓存空值                        .disableCachingNullValues();        // 创建自定义缓存管理器        return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);    }}
3.5、自定义CacheManager(核心)
/** * 自定义Redis缓存管理器, 增加自定义缓存过期时间 */public class CustomRedisCacheManager extends RedisCacheManager {    public CustomRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {        super(cacheWriter, defaultCacheConfiguration);    }    @Override    protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {        // 根据#号分隔        String[] array = StringUtils.delimitedListToStringArray(name, "#");        name = array[0];        if (array.length > 1) { // 解析TTL            long ttl = Long.parseLong(array[1]);            cacheConfig = cacheConfig.entryTtl(Duration.ofMinutes(ttl)); // 注意单位我此处用的是分钟,而非毫秒        }        return super.createRedisCache(name, cacheConfig);    }}

可以看到重写了createRedisCache方法,对缓存的key做了一个处理,将key按照#号分开,后面那一段就是过期时间,然后将过期时间设置到对应的key上。

注意:而我们#号后面可以是数字,其实是不是也可以是配置文件的全限定名称。比如:test.name.expire。这东西可以从配置文件读取,放在数据库,或者nacos都是可以的。

这样子就实现了过期时间的配置化处理。

3.6、Redis配置文件

server:  port: 8080redis:  # Redis开关  enable: true  # Redis地址,格式ip:port,ip:port。集群使用逗号分割  host: 162.14.74.11:6379  # 密码  password:  # 数据库  database: 0  # 最大重试次数  max-redirects: 3  # 使用统一前缀管理  key-prefix: itdl

当我们不配置Redis相关属性时,则不会创建相关的Bean对象。只有开启了开关,才会去创建对象。这样子我们就具备了做成一个stater的基础条件了。

4、测试4.1、编写Service层级测试接口

/** * 测试service层次 */public interface MyTestService {    // 用于测试缓存和过期时间是否生效    TestEntity getById(Long id);    // 用于测试缓存并发的安全性和过期时间是否生效    TestEntity getById2(Long id);}
4.2、编写Service层级测试接口实现类
@Service@Slf4jpublic class MyTestServiceImpl implements MyTestService {    @Cacheable(value = "test#3", key = "#id")    public TestEntity getById(Long id){        TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);        log.info("模拟查询数据库:{}", testEntity);        return testEntity;    }    @Cacheable(value = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存    public TestEntity getById2(Long id){        TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);        log.info("模拟查询数据库:{}", testEntity);        return testEntity;    }}

可以看到,我写了两个方法,唯一的区别就是value的值和下面方法多了一个sync = true。

效果我们随后使用测试类来进行从测试便知。

4.3、测试缓存即过期时间

@SpringBootTestpublic class TestServiceRunner {    @Autowired    private MyTestService myTestService;    /**     * 测试两个请求(同参),只有一个请求到达,另一个走缓存     */    @Test    public void testMyTestService(){        TestEntity t1 = myTestService.getById(1L);        TestEntity t2 = myTestService.getById(1L);    }}  

理论上来说,我们将有一个itdl:test:1这样的key,存入redis, 过期时间为3分钟。并且第二次请求会命中Redis而不会打印模拟查询数据库。

看似已经没有什么问题了,但是我们目前是串行化调用的缓存,如果并发去调用呢?会不会有问题?

4.4、测试缓存即过期时间(并发调用)

为什么要做这样的测试?

因为这样就可以模拟同一个热key(该key经常被访问,随时都有大量的请求),突然失效了,如果存在并发问题,此时缓存将会被击穿(缓存击穿),大量请求直达数据库。这样子可能数据库直接就嗝屁了。

所以我们必然要经过并发测试的。来吧~~~

我们使用一个大小为8的线程池,使用for循环不断的去执行线程,调用同一个方法。

如果模拟数据库操作只打印了多次,我们就认为一定存在线程安全性问题。如果只打印了一次,那就可能不存在,当然只是可能而已。

@SpringBootTestpublic class TestServiceRunner2 {    @Autowired    private MyTestService myTestService;    // 创建一个固定线程池    private ExecutorService executorService = Executors.newFixedThreadPool(8);    /**     * 多线程访问请求,测试切面的线程安全性     */    @Test    public void testMultiMyTestService() throws InterruptedException {        for (int i = 0; i < 100; i++) {            executorService.submit(() -> {                TestEntity t1 = myTestService.getById(1L);            });        }        // 主线程休息10秒种        Thread.sleep(10000);    }}

此时三分钟早已溜走,之前的缓存已经过期。我们开始测试并发吧。

看样子确实存在并发问题啊,这可咋整呢,别急,山人自有妙计!

4.5、使用sync=true解决并发问题

@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存

时间过得真快,三分钟又悄悄溜走了。我们来测试一下sync的效果

@SpringBootTestpublic class TestServiceRunner3 {    @Autowired    private MyTestService myTestService;    // 创建一个固定线程池    private ExecutorService executorService = Executors.newFixedThreadPool(8);    /**     * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)     */    @Test    public void testMultiSyncMyTestService() throws InterruptedException {        for (int i = 0; i < 100; i++) {            executorService.submit(() -> {                TestEntity t1 = myTestService.getById2(1L);            });        }        // 主线程休息10秒种        Thread.sleep(10000);    }}

可以看到,即使我们使用多线程去调用getById2,仍然只有一个线程执行了数据库查询,其余都是走的缓存。

这其实就是在查询数据库和回填缓存时,加了一把锁。在高并发的情况下,我们查询数据库,和存缓存这个操作加锁排队进行。那么第二个请求一定就是从缓存获取数据。

但是如果查询数据库结果为空呢?在瞬时并发的情况下,虽然加了锁查询数据库,但是每次的查询结果都是空,于是数据不会存储到Redis.

这样每次还是会查询数据库,而每次查询都为空值,结果同样是大量的请求是数据库承受的,高并发下,可能会直接压垮数据库,导致雪崩。

我们做个测试,多线程查询一条为空的数据。

4.6、测试结果为空情况

首先,在MyTestServiceImpl的getById2中,添加id为10的数据返回空值

@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存public TestEntity getById2(Long id){    if (id == 10){        log.info("id为10没有查询到数据,返回空值");        return null;    }    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);    log.info("模拟查询数据库:{}", testEntity);    return testEntity;}

然后,使用一个多线程去请求这个为空的数据。

@SpringBootTestpublic class TestServiceRunner4 {    @Autowired    private MyTestService myTestService;    // 创建一个固定线程池    private ExecutorService executorService = Executors.newFixedThreadPool(8);    /**     * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)     */    @Test    public void testMultiSyncMyTestService() throws InterruptedException {        for (int i = 0; i < 100; i++) {            executorService.submit(() -> {                TestEntity t1 = myTestService.getById2(10L);            });        }        // 主线程休息10秒种        Thread.sleep(10000);    }}

可以看到,这样的请求在高并发下,很容易就会将数据库给压垮。

结论:高并发情境下,我们务必也要对空值也进行处理。

5、数据库结果为空处理

properties文件增加缓存空值的开关属性

/** * 是否缓存空值,默认不缓存 */private boolean cacheNullValues = false;

修改RedisCacheManager的Bean实例化过程,只有开关关闭时才将缓存空值禁用。

/** * 自定义cacheManager */@Bean@ConditionalOnMissingBean(RedisCacheManager.class)public RedisCacheManager cacheManager() {    // String序列化    RedisSerializer<String> strSerializer = new StringRedisSerializer();    // json序列化    Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();    // set RedisCacheConfiguration    RedisCacheConfiguration config =            RedisCacheConfiguration.defaultCacheConfig()                    // 增加自定义的前缀                    .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")                    // 设置key value序列化器                    .serializeKeysWith(RedisSerializationContext.SerializationPair                            .fromSerializer(strSerializer))                    .serializeValuesWith(RedisSerializationContext.SerializationPair                            .fromSerializer(jsonRedisSerializer))                    ;    if (!redisProperties.isCacheNullValues()){        // 禁用缓存空值        config.disableCachingNullValues();    }    // 创建自定义缓存管理器    return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);}

在application.yml种,开启对空值的缓存

redis:  # Redis开关  enable: true  # Redis地址,格式ip:port,ip:port。集群使用逗号分割  host: 162.14.74.11:6379  # 密码  password:  # 数据库  database: 0  # 最大重试次数  max-redirects: 3  # 使用统一前缀管理  key-prefix: itdl  # 缓存空值开关开启  cache-null-values: true

重新测试

可以看出来,将空值缓存之后,我们的数据库压力又变小了,它又安全了,不会被压垮了。

但是,我们缓存了空值之后,假如我们又新增了一条id为10的数据呢? 我们再次查询居然还是空值,这可咋整呢?

1、我们本身已经设置了缓存过期时间,如果数据允许有延迟,可以等待缓存的空值过期。或者缩短缓存的过期时间2、新增数据或修改数据后,根据相关的key主动删除相关缓存3、编写定时任务,定期清空Redis中为SpringCache缓存的空值的数据。或者预留接口去手动删除相关的空值缓存。4、使用一个巨大的bitmap,提前查询已存在的key,编写一个布隆过滤器,提前把空值数据拦截,不经过缓存,不经过数据库。5、扩展DefaultRedisCacheWriter重写put方法,收集空值的key列表7、扩展DefaultRedisCacheWriter重写put方法

我们先来看一下RedisCacheManager的构造方法

private RedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration,        boolean allowInFlightCacheCreation) {    Assert.notNull(cacheWriter, "CacheWriter must not be null!");    Assert.notNull(defaultCacheConfiguration, "DefaultCacheConfiguration must not be null!");    this.cacheWriter = cacheWriter;    this.defaultCacheConfig = defaultCacheConfiguration;    this.initialCacheConfiguration = new LinkedHashMap<>();    this.allowInFlightCacheCreation = allowInFlightCacheCreation;}

可以看到,构造函数有一个重要的参数RedisCacheWriter。该接口就是定义了一些写入Redis的操作。

public interface RedisCacheWriter extends CacheStatisticsProvider {    // 创建一个无锁的RedisCacheWriter    static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {        return nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.keys());    }    // 实际上就是创建了一个DefaultRedisCacheWriter    static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory,                                                       BatchStrategy batchStrategy) {        return new DefaultRedisCacheWriter(connectionFactory, batchStrategy);    }}	

可以看到,实际上就是创建了一个DefaultRedisCacheWriter。所以只要我们构建时传入DefaultRedisCacheWriter或者其扩展类,就可以拦截put方法(缓存入库的方法)。

但是DefaultRedisCacheWriter不是一个public的类,无法继承它进行扩展,所以我直接拷贝了一份,然后在put方法内增加了Redis存储的空值NullValue的判断,即可收集空值集合。

public class CustomRedisCacheWriter implements RedisCacheWriter {    // 其实在Redis中,值为null的数据并不会存储为空字符串或者null, 而是存储的一个NullValue的实例的序列化结果    private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);    private final RedisConnectionFactory connectionFactory;    private final Duration sleepTime;    private final CacheStatisticsCollector statistics;    private final BatchStrategy batchStrategy;    // 引入上下文,方便从上下文获取Bean做收集动作    private ApplicationContext applicationContext;    // 构造方法增加ApplicationContext参数    public CustomRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy, ApplicationContext applicationContext) {        this(connectionFactory, Duration.ZERO, batchStrategy);        this.applicationContext = applicationContext;    }    // put方法,增加空值判断    @Override    public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {        Assert.notNull(name, "Name must not be null!");        Assert.notNull(key, "Key must not be null!");        Assert.notNull(value, "Value must not be null!");        // 注意:这里就是结果为空值        if (new String(BINARY_NULL_VALUE, StandardCharsets.UTF_8).equals(new String(value, StandardCharsets.UTF_8))){            // 存储的数据就是NULLValue, 将空值的key收集起来            applicationContext.getBean(CacheNullValuesHandle.class).collectNullValueKeys(new String(key, StandardCharsets.UTF_8));        }        execute(name, connection -> {            if (shouldExpireWithin(ttl)) {                connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());            } else {                connection.set(key, value);            }            return "OK";        });        statistics.incPuts(name);    }}	

收集空值的处理类

/** * Redis存储的空值处理 */@ConditionalOnProperty(value = "redis.enable", havingValue = "true")@Component@Slf4jpublic class CacheNullValuesHandle implements DisposableBean {    @Resource    private RedisTemplate<String, Object> redisTemplate;    private static final String cacheNullKeySetKey = ":cacheNullKeySetKey";    @Autowired    private CustomRedisProperties redisProperties;    // 收集的空值集合,放在set里,自动去重    private final Set<String> cacheNullValueKeys = new LinkedHashSet<>();    public synchronized void collectNullValueKeys(String key){        // 如果已经存在了,则直接        if (cacheNullValueKeys.contains(key)){            return;        }        // 空值数量小于最大数量,才存储在内存中        if (cacheNullValueKeys.size() < redisProperties.getCacheNullValueKeySizeInMemory()){            cacheNullValueKeys.add(key);        }else {            // 超过,直接存储在Redis Set中            redisTemplate.opsForSet().add(redisProperties.getKeyPrefix() + cacheNullKeySetKey, key);        }    }    public synchronized Set<String> getCacheNullValueKeys(){        return cacheNullValueKeys;    }    @Override    public void destroy() throws Exception {        // 销毁时,删除自己对应的空值key        try {            log.info("清空空值key列表:{}", String.join(",", cacheNullValueKeys));            // 销毁的时候,那些空值key, 防止过多的空值key占用空间            redisTemplate.delete(cacheNullValueKeys);            // 删除空值key列表集合key            redisTemplate.delete(redisProperties.getKeyPrefix() + cacheNullKeySetKey);            cacheNullValueKeys.clear();        } catch (Exception e) {            e.printStackTrace();        }    }}

我们将空值key收集起来了,只要服务不挂,那么全量的空值key列表就会存放在Redis Set集合中。如果空值key的缓存时间比较长的话,可以读取集合,然后遍历集合删除即可。通过页面对这些空值key进行管理,或者通过定时任务进行删除。

8、总结

我们给@Cacheable添加了自定义的缓存过期时间设置。可以分散设置从而防止缓存集中时效。

又发现@Cacheable缓存数据库存在线程安全问题,在多线程的场景同一个key将会多次访问到数据库,从而让数据库压力剧增。

然后使用sync=true,加锁排队查询数据库和回填缓存,从而解决并发问题。

随后又测试了查询数据结果为空,每次都会去查询数据库。

而空值的数据通常就是一些非法的参数,查询不到结果。可能是黑客模拟的参数,这些参数缓存在Redis,是极为正确的选择。

如果是我们后续新增或者修改数据的情况,主动删除对应的空值缓存即可。这样子数据就是实时的了。

如果说有些毫无意义的空值存储在Redis, 并且过期时间又比较长,就可以使用定时任务,定时去删除空值。或者使用缓存key管理,手动删除那些对应的key。

标签: #重定向次数过多清除也没用