前言:
此刻各位老铁们对“重定向次数过多清除也没用”大致比较关切,你们都需要分析一些“重定向次数过多清除也没用”的相关资讯。那么小编在网摘上网罗了一些有关“重定向次数过多清除也没用””的相关资讯,希望大家能喜欢,姐妹们快快来了解一下吧!1、问题描述
我们在使用SpringCache的@Cacheable注解时,发现并没有设置过期时间这个功能。
@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface Cacheable { // cacheNames的别名。与cacheNames二选一即可 @AliasFor("cacheNames") String[] value() default {}; // 也就是我们存储到Redis的key的前缀部分。比如user:, 后面部分来源于参数 @AliasFor("value") String[] cacheNames() default {}; // 同一个缓存名称的不同参数,key是显式指定。如#id, 表示去参数种的id字段。支持SpEL表达式 String key() default ""; // 同一个缓存名称的不同参数,keyGenerator是因为无法直接取到参数,参数需要经过一系列较为复杂的处理才能获得。通过KeyGenerator生成 String keyGenerator() default ""; // 指定缓存管理器,通常不会指定,使用默认的即可 String cacheManager() default ""; // 指定缓存解析器 String cacheResolver() default ""; // 存入缓存的条件,支持SpEL表达式,结果为true才会存入缓存 String condition() default ""; // 不存入缓存的条件,支持SpEL表达式,结果为true则不会存入缓存 String unless() default ""; // 是否同步回填缓存,并发访问@Cacheable时,因为线程安全问题,缓存还没来得及写入Redis, 就已经开始新的访问了,从而导致数据库被N次访问。 boolean sync() default false;}
由上面的接口描述可以知道,我们没有设置缓存过期时间的地方。
而我们的缓存基本都是存在过期时间的,否则Redis的数据会越堆越多,并且没有释放,而最终的结果将会是Redis被撑爆。
因此,我们需要让SpringCache支持过期时间的设置,我们可以把这个过期时间放在cacheNames的值后面,并且通过#号设置, 形如下面这种写法
@Cacheable(cacheNames = "test2#3", key = "#id")public TestEntity getById2(Long id){ TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10); log.info("模拟查询数据库:{}", testEntity); return testEntity;}
我们将过期时间跟在value这个key的后面,通过#号分割,这样可以方便我们将过期时间分散设置。
2、为什么要将缓存过期时间分散设置?
其实为什么要将缓存过期时间分散设置,就是因为缓存过期时间设置相同存在一个巨大的问题:“缓存集中失效,导致缓存雪崩”。
因为大量的缓存key在同一时间失效,导致大量的请求直接穿透缓存(缓存穿透),命中到数据库。
从而把数据库瞬间压垮,甚至导致服务宕机,从而导致服务雪崩。所以将缓存时间分散设置能够非常有效的避免缓存雪崩。
当然,缓存雪崩的解决方案还有另外一种加锁回填缓存方式,我们会在后面提到并给出实际操作代码,但两者一起用丝毫不会冲突。
3、扩展SpringCache使其支持过期时间设置
我们不仅仅要让SpringCache支持分散时间,后续还会将其封装为一个spring-boot-stater,成为一个公共的模块,让你在以后使用时能够直接复用。
3.1、思路说明
主要就是要重写CacheManager的逻辑,并且重写CacheManager的createRedisCache方法,将@Cacheble的cacheNames或value的值根据#号进行拆分。
然后使用RedisCacheConfiguration的entryTtl设置过期时间即可。
但是我们为了后面封装为stater作为基础,我们这里会自定义RedisConnectionFactory,RedisTemplate和CacheManager.
3.2 自定义Redis属性配置文件
/** * 自定义Redis属性文件 */@ConfigurationProperties(prefix = "redis", ignoreInvalidFields = true) // prefix:属性配置前缀, ignoreInvalidFields: 忽略字段的校验@Data // 简化Setter和Getterpublic class CustomRedisProperties { /** * 是否开启Redis的配置 */ private boolean enable = false; /** * Redis连接地址,IP:PORT,IP:PORT */ private String host = ""; /** * Redis连接密码 */ private String password = ""; /** * 如果是单机版,可以选择哪个数据库 */ private Integer database = 0; /** * 最大重定向次数,可选配置 */ private Integer maxRedirects = null; /** * SpringCache前缀, 方便key管理 */ private String keyPrefix = "myrds"; /**jedis连接池配置*/ private CustomRedisPoolConfig<?> pool = new CustomRedisPoolConfig<>(); @Data @EqualsAndHashCode(callSuper = false) public static class CustomRedisPoolConfig<T> extends GenericObjectPoolConfig<T> { // 定义扩展属性 }}
我们使用自定义的Redis属性配置文件,同时支持单机和集群两种模式,并且延用了原来的连接池配置。
3.3 自定义Redis配置抽象类
抽象类定义了一些RedisConnectionFactory,RedisTemplate等构建对象的构建方法。
/** * Redis配置抽象类 */@Slf4jpublic abstract class AbstractRedisConfig { /** * Redis是否是集群的标识 */ protected AtomicBoolean redisCluster = new AtomicBoolean(false); /** * 根据Redis属性文件构造RedisConnectionFactory * @param redisProperties Redis属性文件 * @return RedisConnectionFactory */ protected RedisConnectionFactory getRedisConnectionFactory(CustomRedisProperties redisProperties) { if (StringUtils.isBlank(redisProperties.getHost())){ throw new RuntimeException("redis host is not null"); } // 根据逗号切割host列表 Set<String> hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost()); if (CollectionUtils.isEmpty(hosts)){ throw new RuntimeException("redis host address cannot be empty"); } // 只有一个host, 表示是单机host if (hosts.size() == 1){ String hostPort = hosts.stream().findFirst().get(); String[] hostArr = hostStr2Arr(hostPort); return getSingleConnectionFactory(redisProperties, hostArr); } // 集群处理 RedisClusterConfiguration configuration = new RedisClusterConfiguration(); List<RedisNode> listNodes = new ArrayList<>(); for (String host : hosts) { String[] split = hostStr2Arr(host); RedisNode redisNode = new RedisClusterNode(split[0], Integer.parseInt(split[1])); listNodes.add(redisNode); } return getClusterConnectionFactory(redisProperties, configuration, listNodes); } /** * 构造单机版Redis连接工厂 * @param redisProperties Redis属性文件 * @param hostArr 连接地址列表,单机传一个 * @return ConnectionFactory */ protected LettuceConnectionFactory getSingleConnectionFactory(CustomRedisProperties redisProperties, String[] hostArr) { redisCluster.set(false); // 构造单机版Redis连接工厂 RedisStandaloneConfiguration singleConf = new RedisStandaloneConfiguration(); singleConf.setHostName(hostArr[0]); singleConf.setPassword(RedisPassword.of(redisProperties.getPassword())); singleConf.setPort(Integer.parseInt(hostArr[1])); singleConf.setDatabase(redisProperties.getDatabase()); // 创建连接池 final LettucePoolingClientConfiguration configuration = LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build(); LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(singleConf, configuration); log.info("==============创建单机版Redis连接工厂成功=================="); log.info("=============={}==================", redisProperties.getHost()); return connectionFactory; } /** * 构造集群版Redis连接工厂 * @param redisProperties Redis属性文件 * @param listNodes 集群节点列表 * @return ConnectionFactory */ protected LettuceConnectionFactory getClusterConnectionFactory(CustomRedisProperties redisProperties, RedisClusterConfiguration configuration, List<RedisNode> listNodes) { redisCluster.set(true); // 设置最大重定向次数 configuration.setMaxRedirects(redisProperties.getMaxRedirects()); configuration.setClusterNodes(listNodes); // 设置密码 configuration.setPassword(RedisPassword.of(redisProperties.getPassword())); // 构造集群版Redis连接工厂 final LettucePoolingClientConfiguration lettucePoolingClientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build(); LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, lettucePoolingClientConfiguration); log.info("==============创建集群版Redis连接工厂成功=================="); log.info("=============={}==================", redisProperties.getHost()); return connectionFactory; } /** * host字符串转换为数组 * @param hostPort Redis连接地址和端口 * @return hostPort数组 */ protected String[] hostStr2Arr(String hostPort) { String[] hostArr = hostPort.split(":"); if (hostArr.length != 2) { throw new RuntimeException("host or port err"); } return hostArr; } protected RedisTemplate<String, Object> buildRestTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 设置连接工厂 template.setConnectionFactory(redisConnectionFactory); // String序列化 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer<?> jacksonSerializer = buildRedisJackson(); // key and hashKey 序列化方式 template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); // value serializer template.setDefaultSerializer(jacksonSerializer); template.setValueSerializer(jacksonSerializer); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } /** * 构建JSON序列化器 */ protected Jackson2JsonRedisSerializer<?> buildRedisJackson() { // JSON序列化 Jackson2JsonRedisSerializer<?> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // enableDefaultTyping 方法已经过时,使用新的方法activateDefaultTyping // objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); jacksonSerializer.setObjectMapper(om); return jacksonSerializer; } /** * 获取Redis集群状态 是单机模式还是集群模式 */ public AtomicBoolean getRedisCluster() { return redisCluster; }}
可以看到,我们将一些公共的东西提取了出来,并且将每个方法声明为protect, 方便子类重写各自的方法。
3.4 自定义Redis配置
/** * 自定义Redis配置文件 */@Configuration // 标识为一个配置项,注入Spring容器@EnableConfigurationProperties(CustomRedisProperties.class) // 启动Redis配置文件@ConditionalOnProperty(value = "redis.enable", havingValue = "true")@EnableCaching@Slf4jpublic class CustomRedisConfig extends AbstractRedisConfig{ @Resource private CustomRedisProperties redisProperties; /** * 注册RedisConnectionFactory */ @Bean @ConditionalOnMissingBean(RedisConnectionFactory.class) public RedisConnectionFactory redisConnectionFactory() { return getRedisConnectionFactory(redisProperties); } /** * 自定义注册RedisTemplate * @param redisConnectionFactory 自定义的Redis连接工厂 * @return RedisTemplate */ @Bean @ConditionalOnMissingBean(RedisTemplate.class) public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { return buildRestTemplate(redisConnectionFactory); } /** * 自定义cacheManager */ @Bean @ConditionalOnMissingBean(RedisCacheManager.class) public RedisCacheManager cacheManager() { // String序列化 RedisSerializer<String> strSerializer = new StringRedisSerializer(); // json序列化 Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson(); // set RedisCacheConfiguration RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() // 增加自定义的前缀 .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":") // 设置key value序列化器 .serializeKeysWith(RedisSerializationContext.SerializationPair .fromSerializer(strSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair .fromSerializer(jsonRedisSerializer)) // 禁用缓存空值 .disableCachingNullValues(); // 创建自定义缓存管理器 return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config); }}3.5、自定义CacheManager(核心)
/** * 自定义Redis缓存管理器, 增加自定义缓存过期时间 */public class CustomRedisCacheManager extends RedisCacheManager { public CustomRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) { super(cacheWriter, defaultCacheConfiguration); } @Override protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) { // 根据#号分隔 String[] array = StringUtils.delimitedListToStringArray(name, "#"); name = array[0]; if (array.length > 1) { // 解析TTL long ttl = Long.parseLong(array[1]); cacheConfig = cacheConfig.entryTtl(Duration.ofMinutes(ttl)); // 注意单位我此处用的是分钟,而非毫秒 } return super.createRedisCache(name, cacheConfig); }}
可以看到重写了createRedisCache方法,对缓存的key做了一个处理,将key按照#号分开,后面那一段就是过期时间,然后将过期时间设置到对应的key上。
注意:而我们#号后面可以是数字,其实是不是也可以是配置文件的全限定名称。比如:test.name.expire。这东西可以从配置文件读取,放在数据库,或者nacos都是可以的。
这样子就实现了过期时间的配置化处理。
3.6、Redis配置文件
server: port: 8080redis: # Redis开关 enable: true # Redis地址,格式ip:port,ip:port。集群使用逗号分割 host: 162.14.74.11:6379 # 密码 password: # 数据库 database: 0 # 最大重试次数 max-redirects: 3 # 使用统一前缀管理 key-prefix: itdl
当我们不配置Redis相关属性时,则不会创建相关的Bean对象。只有开启了开关,才会去创建对象。这样子我们就具备了做成一个stater的基础条件了。
4、测试4.1、编写Service层级测试接口
/** * 测试service层次 */public interface MyTestService { // 用于测试缓存和过期时间是否生效 TestEntity getById(Long id); // 用于测试缓存并发的安全性和过期时间是否生效 TestEntity getById2(Long id);}4.2、编写Service层级测试接口实现类
@Service@Slf4jpublic class MyTestServiceImpl implements MyTestService { @Cacheable(value = "test#3", key = "#id") public TestEntity getById(Long id){ TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10); log.info("模拟查询数据库:{}", testEntity); return testEntity; } @Cacheable(value = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存 public TestEntity getById2(Long id){ TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10); log.info("模拟查询数据库:{}", testEntity); return testEntity; }}
可以看到,我写了两个方法,唯一的区别就是value的值和下面方法多了一个sync = true。
效果我们随后使用测试类来进行从测试便知。
4.3、测试缓存即过期时间
@SpringBootTestpublic class TestServiceRunner { @Autowired private MyTestService myTestService; /** * 测试两个请求(同参),只有一个请求到达,另一个走缓存 */ @Test public void testMyTestService(){ TestEntity t1 = myTestService.getById(1L); TestEntity t2 = myTestService.getById(1L); }}
理论上来说,我们将有一个itdl:test:1这样的key,存入redis, 过期时间为3分钟。并且第二次请求会命中Redis而不会打印模拟查询数据库。
看似已经没有什么问题了,但是我们目前是串行化调用的缓存,如果并发去调用呢?会不会有问题?
4.4、测试缓存即过期时间(并发调用)
为什么要做这样的测试?
因为这样就可以模拟同一个热key(该key经常被访问,随时都有大量的请求),突然失效了,如果存在并发问题,此时缓存将会被击穿(缓存击穿),大量请求直达数据库。这样子可能数据库直接就嗝屁了。
所以我们必然要经过并发测试的。来吧~~~
我们使用一个大小为8的线程池,使用for循环不断的去执行线程,调用同一个方法。
如果模拟数据库操作只打印了多次,我们就认为一定存在线程安全性问题。如果只打印了一次,那就可能不存在,当然只是可能而已。
@SpringBootTestpublic class TestServiceRunner2 { @Autowired private MyTestService myTestService; // 创建一个固定线程池 private ExecutorService executorService = Executors.newFixedThreadPool(8); /** * 多线程访问请求,测试切面的线程安全性 */ @Test public void testMultiMyTestService() throws InterruptedException { for (int i = 0; i < 100; i++) { executorService.submit(() -> { TestEntity t1 = myTestService.getById(1L); }); } // 主线程休息10秒种 Thread.sleep(10000); }}
此时三分钟早已溜走,之前的缓存已经过期。我们开始测试并发吧。
看样子确实存在并发问题啊,这可咋整呢,别急,山人自有妙计!
4.5、使用sync=true解决并发问题
@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
时间过得真快,三分钟又悄悄溜走了。我们来测试一下sync的效果
@SpringBootTestpublic class TestServiceRunner3 { @Autowired private MyTestService myTestService; // 创建一个固定线程池 private ExecutorService executorService = Executors.newFixedThreadPool(8); /** * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存) */ @Test public void testMultiSyncMyTestService() throws InterruptedException { for (int i = 0; i < 100; i++) { executorService.submit(() -> { TestEntity t1 = myTestService.getById2(1L); }); } // 主线程休息10秒种 Thread.sleep(10000); }}
可以看到,即使我们使用多线程去调用getById2,仍然只有一个线程执行了数据库查询,其余都是走的缓存。
这其实就是在查询数据库和回填缓存时,加了一把锁。在高并发的情况下,我们查询数据库,和存缓存这个操作加锁排队进行。那么第二个请求一定就是从缓存获取数据。
但是如果查询数据库结果为空呢?在瞬时并发的情况下,虽然加了锁查询数据库,但是每次的查询结果都是空,于是数据不会存储到Redis.
这样每次还是会查询数据库,而每次查询都为空值,结果同样是大量的请求是数据库承受的,高并发下,可能会直接压垮数据库,导致雪崩。
我们做个测试,多线程查询一条为空的数据。
4.6、测试结果为空情况
首先,在MyTestServiceImpl的getById2中,添加id为10的数据返回空值
@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存public TestEntity getById2(Long id){ if (id == 10){ log.info("id为10没有查询到数据,返回空值"); return null; } TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10); log.info("模拟查询数据库:{}", testEntity); return testEntity;}
然后,使用一个多线程去请求这个为空的数据。
@SpringBootTestpublic class TestServiceRunner4 { @Autowired private MyTestService myTestService; // 创建一个固定线程池 private ExecutorService executorService = Executors.newFixedThreadPool(8); /** * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存) */ @Test public void testMultiSyncMyTestService() throws InterruptedException { for (int i = 0; i < 100; i++) { executorService.submit(() -> { TestEntity t1 = myTestService.getById2(10L); }); } // 主线程休息10秒种 Thread.sleep(10000); }}
可以看到,这样的请求在高并发下,很容易就会将数据库给压垮。
结论:高并发情境下,我们务必也要对空值也进行处理。
5、数据库结果为空处理
properties文件增加缓存空值的开关属性
/** * 是否缓存空值,默认不缓存 */private boolean cacheNullValues = false;
修改RedisCacheManager的Bean实例化过程,只有开关关闭时才将缓存空值禁用。
/** * 自定义cacheManager */@Bean@ConditionalOnMissingBean(RedisCacheManager.class)public RedisCacheManager cacheManager() { // String序列化 RedisSerializer<String> strSerializer = new StringRedisSerializer(); // json序列化 Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson(); // set RedisCacheConfiguration RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() // 增加自定义的前缀 .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":") // 设置key value序列化器 .serializeKeysWith(RedisSerializationContext.SerializationPair .fromSerializer(strSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair .fromSerializer(jsonRedisSerializer)) ; if (!redisProperties.isCacheNullValues()){ // 禁用缓存空值 config.disableCachingNullValues(); } // 创建自定义缓存管理器 return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);}
在application.yml种,开启对空值的缓存
redis: # Redis开关 enable: true # Redis地址,格式ip:port,ip:port。集群使用逗号分割 host: 162.14.74.11:6379 # 密码 password: # 数据库 database: 0 # 最大重试次数 max-redirects: 3 # 使用统一前缀管理 key-prefix: itdl # 缓存空值开关开启 cache-null-values: true
重新测试
可以看出来,将空值缓存之后,我们的数据库压力又变小了,它又安全了,不会被压垮了。
但是,我们缓存了空值之后,假如我们又新增了一条id为10的数据呢? 我们再次查询居然还是空值,这可咋整呢?
1、我们本身已经设置了缓存过期时间,如果数据允许有延迟,可以等待缓存的空值过期。或者缩短缓存的过期时间。2、新增数据或修改数据后,根据相关的key主动删除相关缓存3、编写定时任务,定期清空Redis中为SpringCache缓存的空值的数据。或者预留接口去手动删除相关的空值缓存。4、使用一个巨大的bitmap,提前查询已存在的key,编写一个布隆过滤器,提前把空值数据拦截,不经过缓存,不经过数据库。5、扩展DefaultRedisCacheWriter重写put方法,收集空值的key列表7、扩展DefaultRedisCacheWriter重写put方法
我们先来看一下RedisCacheManager的构造方法
private RedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration, boolean allowInFlightCacheCreation) { Assert.notNull(cacheWriter, "CacheWriter must not be null!"); Assert.notNull(defaultCacheConfiguration, "DefaultCacheConfiguration must not be null!"); this.cacheWriter = cacheWriter; this.defaultCacheConfig = defaultCacheConfiguration; this.initialCacheConfiguration = new LinkedHashMap<>(); this.allowInFlightCacheCreation = allowInFlightCacheCreation;}
可以看到,构造函数有一个重要的参数RedisCacheWriter。该接口就是定义了一些写入Redis的操作。
public interface RedisCacheWriter extends CacheStatisticsProvider { // 创建一个无锁的RedisCacheWriter static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) { return nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.keys()); } // 实际上就是创建了一个DefaultRedisCacheWriter static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) { return new DefaultRedisCacheWriter(connectionFactory, batchStrategy); }}
可以看到,实际上就是创建了一个DefaultRedisCacheWriter。所以只要我们构建时传入DefaultRedisCacheWriter或者其扩展类,就可以拦截put方法(缓存入库的方法)。
但是DefaultRedisCacheWriter不是一个public的类,无法继承它进行扩展,所以我直接拷贝了一份,然后在put方法内增加了Redis存储的空值NullValue的判断,即可收集空值集合。
public class CustomRedisCacheWriter implements RedisCacheWriter { // 其实在Redis中,值为null的数据并不会存储为空字符串或者null, 而是存储的一个NullValue的实例的序列化结果 private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE); private final RedisConnectionFactory connectionFactory; private final Duration sleepTime; private final CacheStatisticsCollector statistics; private final BatchStrategy batchStrategy; // 引入上下文,方便从上下文获取Bean做收集动作 private ApplicationContext applicationContext; // 构造方法增加ApplicationContext参数 public CustomRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy, ApplicationContext applicationContext) { this(connectionFactory, Duration.ZERO, batchStrategy); this.applicationContext = applicationContext; } // put方法,增加空值判断 @Override public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { Assert.notNull(name, "Name must not be null!"); Assert.notNull(key, "Key must not be null!"); Assert.notNull(value, "Value must not be null!"); // 注意:这里就是结果为空值 if (new String(BINARY_NULL_VALUE, StandardCharsets.UTF_8).equals(new String(value, StandardCharsets.UTF_8))){ // 存储的数据就是NULLValue, 将空值的key收集起来 applicationContext.getBean(CacheNullValuesHandle.class).collectNullValueKeys(new String(key, StandardCharsets.UTF_8)); } execute(name, connection -> { if (shouldExpireWithin(ttl)) { connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert()); } else { connection.set(key, value); } return "OK"; }); statistics.incPuts(name); }}
收集空值的处理类
/** * Redis存储的空值处理 */@ConditionalOnProperty(value = "redis.enable", havingValue = "true")@Component@Slf4jpublic class CacheNullValuesHandle implements DisposableBean { @Resource private RedisTemplate<String, Object> redisTemplate; private static final String cacheNullKeySetKey = ":cacheNullKeySetKey"; @Autowired private CustomRedisProperties redisProperties; // 收集的空值集合,放在set里,自动去重 private final Set<String> cacheNullValueKeys = new LinkedHashSet<>(); public synchronized void collectNullValueKeys(String key){ // 如果已经存在了,则直接 if (cacheNullValueKeys.contains(key)){ return; } // 空值数量小于最大数量,才存储在内存中 if (cacheNullValueKeys.size() < redisProperties.getCacheNullValueKeySizeInMemory()){ cacheNullValueKeys.add(key); }else { // 超过,直接存储在Redis Set中 redisTemplate.opsForSet().add(redisProperties.getKeyPrefix() + cacheNullKeySetKey, key); } } public synchronized Set<String> getCacheNullValueKeys(){ return cacheNullValueKeys; } @Override public void destroy() throws Exception { // 销毁时,删除自己对应的空值key try { log.info("清空空值key列表:{}", String.join(",", cacheNullValueKeys)); // 销毁的时候,那些空值key, 防止过多的空值key占用空间 redisTemplate.delete(cacheNullValueKeys); // 删除空值key列表集合key redisTemplate.delete(redisProperties.getKeyPrefix() + cacheNullKeySetKey); cacheNullValueKeys.clear(); } catch (Exception e) { e.printStackTrace(); } }}
我们将空值key收集起来了,只要服务不挂,那么全量的空值key列表就会存放在Redis Set集合中。如果空值key的缓存时间比较长的话,可以读取集合,然后遍历集合删除即可。通过页面对这些空值key进行管理,或者通过定时任务进行删除。
8、总结
我们给@Cacheable添加了自定义的缓存过期时间设置。可以分散设置从而防止缓存集中时效。
又发现@Cacheable缓存数据库存在线程安全问题,在多线程的场景同一个key将会多次访问到数据库,从而让数据库压力剧增。
然后使用sync=true,加锁排队查询数据库和回填缓存,从而解决并发问题。
随后又测试了查询数据结果为空,每次都会去查询数据库。
而空值的数据通常就是一些非法的参数,查询不到结果。可能是黑客模拟的参数,这些参数缓存在Redis,是极为正确的选择。
如果是我们后续新增或者修改数据的情况,主动删除对应的空值缓存即可。这样子数据就是实时的了。
如果说有些毫无意义的空值存储在Redis, 并且过期时间又比较长,就可以使用定时任务,定时去删除空值。或者使用缓存key管理,手动删除那些对应的key。
标签: #重定向次数过多清除也没用