黑马点评项目 | Redis学习笔记「纯享版」10w字超详细总结(含资料)
前言:这篇笔记是我二刷Redis实战篇的汇总总结。学完苍穹外卖项目后,我选择了这个黑马点评项目,发现它和以增删改查为主的苍穹外卖不同,更偏向知识深度与实际业务的结合,是更贴近企业真实场景的项目。对于当时的我来说,难度提升明显,第一遍学习时难免稀里糊涂。
由于当时对Redis没有系统性的掌握,即便学完一遍,很多知识点依旧一知半解。于是这个暑假,我重新二刷学习,反复琢磨、反复回看,终于慢慢啃懂了这个项目,也对Redis的实际应用有了更清晰的认知。
为了方便大家收藏后快速复习,文章开头整理了实验室、基地相关面试题,以及项目专属企业面试题;同时在文章末尾,不仅添加了Redis在实际应用中的最佳实践方案,还准备了该项目的全套学习资料,供大家参考使用;在2026年,新增了黑马面试课程配套Redis面经于文章后方,欢迎查阅!
希望这篇笔记能帮助大家加深对黑马点评项目、对Redis实战的理解,也欢迎大家提出宝贵的建议。如果文中存在错误、描述不当的地方,恳请大家告知我,我会及时修正;若有侵权问题,也请及时联系,我会立刻处理。
最后送大家一句话:「追风赶月莫停留,平芜尽处是春山」🌼
内容介绍:


实验室基地面试题(答案位于文末)

企业面试题合集(答案位于文末)


1. 短信登录
1.1 基于Session实现登录流程

1.2 实现发送短信验证码功能
页面流程

具体代码如下:
1. 发送验证码
@Override public Result sendCode(String phone, HttpSession session) { // 1.校验手机号 if (RegexUtils.isPhoneInvalid(phone)) { // 2.如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } // 3.符合,生成验证码 String code = RandomUtil.randomNumbers(6); // 4.保存验证码到 session session.setAttribute("code",code); // 5.发送验证码 log.debug("发送短信验证码成功,验证码:{}", code); // 返回ok return Result.ok(); }2. 登录
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { // 1.校验手机号 String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { // 2.如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } // 3.校验验证码 Object cacheCode = session.getAttribute("code"); String code = loginForm.getCode(); if(cacheCode == null || !cacheCode.toString().equals(code)){ //3.不一致,报错 return Result.fail("验证码错误"); } //一致,根据手机号查询用户 User user = query().eq("phone", phone).one(); //5.判断用户是否存在 if(user == null){ //不存在,则创建 user = createUserWithPhone(phone); } //7.保存用户信息到session中 session.setAttribute("user",user); return Result.ok(); }1.3 实现登录拦截功能
温馨小贴士1:tomcat的运行原理

当用户发起请求时,会访问我们像tomcat注册的端口,任何程序想要运行,都需要有一个线程对当前端口号进行监听,tomcat也不例外,当监听线程知道用户想要和tomcat连接连接时,那会由监听线程创建socket连接,socket都是成对出现的,用户通过socket像互相传递数据,当tomcat端的socket接受到数据后,此时监听线程会从tomcat的线程池中取出一个线程执行用户请求,在我们的服务部署到tomcat后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的controller,service,dao中,并且访问对应的DB,在用户执行完请求后,再统一返回,再找到tomcat端的socket,再将数据写回到用户端的socket,完成请求和响应
通过以上讲解,我们可以得知 每个用户其实对应都是去找tomcat线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用threadlocal来做到线程隔离,每个线程操作自己的一份数据
温馨小贴士2:关于threadlocal
如果小伙伴们看过threadLocal的源码,你会发现在threadLocal中,无论是他的put方法和他的get方法, 都是先从获得当前用户的线程,然后从线程中取出线程的成员变量map,只要线程不一样,map就不一样,所以可以通过这种方式来做到线程隔离

拦截器代码(需要继承HandlerInterceptor才能使用其中的拦截相关方法)
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.获取session HttpSession session = request.getSession(); //2.获取session中的用户 Object user = session.getAttribute("user"); //3.判断用户是否存在 if(user == null){ //4.不存在,拦截,返回401状态码 response.setStatus(401); return false; } //5.存在,保存用户信息到Threadlocal UserHolder.saveUser((User)user); //6.放行 return true; } }Tomcat并不识别所编写的Controller程序,但是它识别Servlet程序,所以在Spring的Web环境中提供了一个非常核心的Servlet:DispatcherServlet(前端控制器),所有请求都会先进行到DispatcherServlet,再将请求转给Controller。当我们定义了拦截器后,会在执行Controller的方法之前,请求被拦截器拦截住。执行preHandle()方法,这个方法执行完成后需要返回一个布尔类型的值,如果返回true,就表示放行本次操作,才会继续访问controller中的方法;如果返回false,则不会放行。让拦截器生效(配置拦截器的拦截路径)
必须用@Resource,不能用@Autowired
@Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { // 登录拦截器 registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( "/shop/**", "/voucher/**", "/shop-type/**", "/upload/**", "/blog/hot", "/user/code", "/user/login" ).order(1); // token刷新的拦截器 registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0); } }
1.4 隐藏用户敏感信息
我们通过浏览器观察到此时用户的全部信息都在,这样极为不靠谱,所以我们应当在返回用户信息之前,将用户的敏感信息进行隐藏,采用的核心思路就是书写一个UserDto对象,这个UserDto对象就没有敏感信息了,我们在返回前,将有用户敏感信息的User对象转化成没有敏感信息的UserDto对象,那么就能够避免这个尴尬的问题了
在登录方法处修改
//7.保存用户信息到session中 session.setAttribute("user", BeanUtils.copyProperties(user,UserDTO.class));在拦截器处
//5.存在,保存用户信息到Threadlocal UserHolder.saveUser((UserDTO) user);在UserHolder处:将user对象换成UserDTO
public class UserHolder { private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>(); public static void saveUser(UserDTO user){ tl.set(user); } public static UserDTO getUser(){ return tl.get(); } public static void removeUser(){ tl.remove(); } }1.5 session共享问题
核心思路分析:
每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题,我们能如何解决这个问题呢?早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
但是这种方案具有两个大问题
1、每台服务器中都有完整的一份session数据,服务器压力过大。
2、session拷贝数据时,可能会出现延迟
所以咱们后来采用的方案都是基于redis来完成,我们把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了

1.6 Redis代替session的流程
1、设计key的结构
首先我们要思考一下利用redis来存储数据,那么到底使用哪种结构呢?由于存入的数据比较简单,我们可以考虑使用String,或者是使用哈希,如下图,如果使用String,同学们注意他的value,会多占用一点空间,如果使用哈希,则他的value中只会存储他数据本身,如果不是特别在意内存,其实使用String就可以的。

2、设计key的具体细节
所以我们可以使用String结构,就是一个简单的key,value键值对的方式,但是关于key的处理,session他是每个用户都有自己的session,但是redis的key是共享的,咱们就不能使用code了
在设计这个key的时候,我们之前讲过需要满足两点:
1、key要具有唯一性
2、key要方便携带
如果我们采用phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到redis中并且从页面中带过来毕竟不太合适,所以我们在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
3、整体访问流程
当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到redis,并且生成token作为redis的key,当我们校验用户是否登录时,会去携带着token进行访问,从redis中取出token对应的value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到threadLocal中,并且放行。

由于舍去了session技术,失去了登陆凭证,故须自己生成token当作登陆凭证,返回给前端

1.7 基于Redis实现短信登录
UserServiceImpl代码
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { // 1.校验手机号 String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { // 2.如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } // 3.从redis获取验证码并校验 String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); String code = loginForm.getCode(); if (cacheCode == null || !cacheCode.equals(code)) { // 不一致,报错 return Result.fail("验证码错误"); } // 4.一致,根据手机号查询用户 select * from tb_user where phone = ? User user = query().eq("phone", phone).one(); // 5.判断用户是否存在 if (user == null) { // 6.不存在,创建新用户并保存 user = createUserWithPhone(phone); } // 7.保存用户信息到 redis中 // 7.1.随机生成token,作为登录令牌 String token = UUID.randomUUID().toString(true); // 7.2.将User对象转为HashMap存储 UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create() .setIgnoreNullValue(true) .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 7.3.存储 String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); // 7.4.设置token有效期 stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES); // 8.返回token return Result.ok(token); }1.8 解决状态登录刷新问题
1.8.1 初始方案思路总结:
在这个方案中,他确实可以使用对应路径的拦截,同时刷新登录token令牌的存活时间,但是现在这个拦截器他只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行,所以这个方案他是存在问题的

1.8.2 优化方案
既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能。

1.8.3 代码
RefreshTokenInterceptor

由于拦截器不是Spring容器中的bean对象,故而不可采用@autowired注解依赖注入
public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.获取请求头中的token String token = request.getHeader("authorization"); if (StrUtil.isBlank(token)) { return true; } // 2.基于TOKEN获取redis中的用户 String key = LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key); // 3.判断用户是否存在 if (userMap.isEmpty()) { return true; } // 5.将查询到的hash数据转为UserDTO UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); // 6.存在,保存用户信息到 ThreadLocal UserHolder.saveUser(userDTO); // 7.刷新token有效期 stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES); // 8.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { // 移除用户 UserHolder.removeUser(); } } 小贴士:BeanUtil可实现Bean和Map之间的相互转换(beanToMap,fillBeanWithMap)
LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.判断是否需要拦截(ThreadLocal中是否有用户) if (UserHolder.getUser() == null) { // 没有,需要拦截,设置状态码 response.setStatus(401); // 拦截 return false; } // 有用户,则放行 return true; } }2. 商户查询缓存
2.1 什么是缓存
缓存:数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码
缓存的主要优点:速度快,好用
缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力(实际开发过程中,企业会大量运用到缓存技术)

2.2 添加商户缓存
在我们查询商户信息时,我们是直接操作从数据库中去进行查询的,大致逻辑是这样,直接查询数据库那肯定慢咯,所以我们需要增加缓存
2.2.1 、缓存模型和思路
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。

2.1.2、代码实现
代码思路:如果缓存有,则直接返回,如果缓存不存在,则查询数据库,然后存入redis。
1. ShopController.java
/** * 根据id查询商铺信息 * @param id 商铺id * @return 商铺详情数据 */ @GetMapping("/{id}") public Result queryShopById(@PathVariable("id") Long id) { return shopService.queryById(id); }2. IShopService.java
IService接口层,继承MP
Result queryById(Long id);3. ShopTypeServiceImpl.java
使用opsForValue实现
@Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById(Long id) { //1.从redis查询商铺缓存 String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { //存在,返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } //不存在,根据id查询你数据库 Shop shop=getById(id); if(shop==null){ //不存在,返回错误信息 return Result.fail("店铺不存在"); } //然后再写入redis stringRedisTemplate.opsForValue().set(key,shop.toString()); //返回 return Result.ok(shop); } } 实战篇-作业-店铺类型缓存-List实现
1. ShopTypeController.java
Controller层
@RestController @RequestMapping("/shop-type") public class ShopTypeController { @Resource private IShopTypeService typeService; @GetMapping("list") public Result queryTypeList() { return typeService.queryTypeList(); } } 2. IShopTypeService.java
IService接口层,继承MP
public interface IShopTypeService extends IService<ShopType> { Result queryTypeList(); } 3. ShopTypeServiceImpl.java
使用opsForList实现
@Service public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService { @Autowired private StringRedisTemplate stringRedisTemplate; @Override public Result queryTypeList() { // opsForList写法 // public static final String CACHE_SHOP_TYPE_KEY = "cache:shopType:"; String key = CACHE_SHOP_TYPE_KEY; // 1. 从Redis查询 商铺类型缓存 , end:-1 表示取全部数据 List<String> shopTypeJson = stringRedisTemplate.opsForList().range(key, 0, -1); // 2. 有就直接返回 if (CollectionUtil.isNotEmpty(shopTypeJson)) { // JSON字符串转对象 排序后返回 List<ShopType> shopTypes = JSONUtil.toList(shopTypeJson.toString(), ShopType.class); Collections.sort(shopTypes, ((o1, o2) -> o1.getSort() - o2.getSort())); return Result.ok(shopTypes); } // 3. 没有就向数据库查询 MP的query()拿来用 List<ShopType> shopTypes = query().orderByAsc("sort").list(); // 4. 不存在,返回错误 if (CollectionUtil.isEmpty(shopTypes)){ return Result.fail("商铺类型不存在..."); } // 5. 存在, 写入Redis,这里使用Redis的List类型,String类型,就是直接所有都写在一起,对内存开销比较大。 // 要将List中的每个元素(元素类型ShopType) ,每个元素都要单独转成JSON,使用stream流的map映射 // Hutools里的 BeanUtil.copyToList 本来想模仿UserService中的写法, // 传入一个CopyOptions的,但是setFieldValueEditor貌似只对beanToMap有效 // 改用流的形式转换每个list元素 List<String> shopTypesJson = shopTypes.stream() .map(shopType -> JSONUtil.toJsonStr(shopType)) .collect(Collectors.toList()); // 因为从数据库读出来的时候已经是按照顺序读出来的,这里想要维持顺序必须从右边push,类似队列 stringRedisTemplate.opsForList().rightPushAll(key, shopTypesJson); // 5. 返回 return Result.ok(shopTypes); } } 2.3 缓存更新策略
缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行更新,或者把他叫为淘汰更合适。
内存淘汰:redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
超时剔除:当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

2.3.1 数据库缓存不一致解决方案:
由于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是:
用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等;怎么解决呢?有如下几种方案
| 方案 | 解释 |
| Cache Aside Pattern | 缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案 |
| Read/Write Through Pattern | 由系统本身完成,数据库与缓存的问题交由系统本身去处理 |
| Write Behind Caching Pattern | 调用者只操作缓存,其他线程去异步处理数据库,实现最终一致 |

2.3.2 、数据库和缓存不一致采用什么方案
综合考虑使用方案一,但是方案一调用者如何处理呢?这里有几个问题
操作缓存和数据库时有三个问题需要考虑:

如果采用第一个方案,那么假设我们每次操作数据库后,都操作缓存,但是中间如果没有人查询,那么这个更新动作实际上只有最后一次生效,中间的更新动作意义并不大,我们可以把缓存删除,等待再次查询时,将缓存中的数据加载出来
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
AI解答:更新缓存:这种方式是指在每次更新数据库的同时,也更新缓存中的数据。虽然可以保证缓存数据的实时性,但在高并发场景下,可能会导致大量的无效写操作。例如,在无用户访问时,多个请求几乎同时对同一个数据进行更新,可能会造成缓存更新的混乱和资源浪费。删除缓存:这种方式是指在更新数据库时,不立即更新缓存,而是将缓存中的对应数据删除或标记为失效。当下次有请求访问该数据时,发现缓存失效,再从数据库中读取最新数据并更新缓存。这种方式可以避免频繁的缓存更新操作,减少资源消耗,同时也保证了数据的一致性。在实际应用中,尤其是在面对高并发和大数据量的场景时,这种方式更为推荐,因为它能够更好地平衡缓存的时效性和系统性能。
(这种方式减少了在无用户访问时,对缓存进行的无效操作)
- 如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
应该具体操作缓存还是操作数据库,我们应当是先操作数据库,再删除缓存,原因在于,如果你选择第一种方案,在两个线程并发来访问时,假设线程1先来,他先把缓存删了,此时线程2过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程1再执行更新动作时,实际上写入的就是旧的数据,新的数据被旧数据覆盖了。
- 先操作缓存还是先操作数据库?
- 先删除缓存,再操作数据库(发生错误的概率很高)
- 先操作数据库,再删除缓存(发生错误的概率极低)

2.4 实现数据双写一致

修改重点代码1:修改ShopServiceImpl的queryById方法
设置redis缓存时添加过期时间
修改的地方非常少,故而提供修改截图

修改重点代码2
代码分析:通过之前的淘汰,我们确定了采用删除策略,来解决双写问题,当我们修改了数据之后,然后把缓存中的数据进行删除,查询时发现缓存中没有数据,则会从mysql中加载最新的数据,从而避免数据库和缓存不一致的问题
修改的地方非常少,故而提供修改截图


2.5 缓存穿透问题的解决思路
缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
| 解决方案 | 缓存空对象 | 布隆过滤 |
| 优点 | 实现简单,维护方便 | 内存占用较少,没有多余key |
| 缺点 | 额外的内存消耗;可能造成短期的不一致 | 实现复杂;存在误判可能 |
缓存空对象思路分析:当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了。
布隆过滤:布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中。假设布隆过滤器判断这个数据不存在,则直接返回。
这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突。

2.6 编码解决商品查询的缓存穿透问题
核心思路如下:
在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的
现在的逻辑中:如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。

修改的地方非常少,故而提供修改截图

2.7 缓存雪崩问题及解决思路
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存

2.8 缓存击穿问题及解决思路
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

逻辑分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
重建缓存触发场景缓存过期 :缓存数据设置了过期时间,当过期时间到达后,缓存失效,需要重建。比如在 Redis 中,设置了某个 key 的过期时间为 1 小时,1 小时后该 key 对应的缓存数据就会失效,需要重新从数据库获取数据并写入缓存。
方案一 互斥锁
使用互斥锁,确保同一时间只有一个线程重建缓存
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。
假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

方案二 逻辑过期
采用逻辑过期的方式,在缓存中存储数据时附加一个过期时间,当查询到缓存过期时,先返回旧数据,同时异步重建缓存
方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。
我们把过期时间设置在 redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1去获得互斥锁,那么其他线程会进行阻塞,获得了锁的线程他会开启一个 线程去进行 以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁, 而线程1直接进行返回,假设现在线程3过来访问,由于线程线程2持有着锁,所以线程3无法获得锁,线程3也直接返回数据,只有等到新开的线程2把重建数据构建完后,其他线程才能走返回正确的数据。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。

两者对比:
互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响
逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦

2.9 利用互斥锁解决缓存击穿问题
核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

操作锁的代码:
核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。
在ShopServiceImpl里添加:
private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); }定义一个缓存穿透方法,将queryId中代码剪切到该方法中,再做适当修改:

代码如下:
//缓存穿透 public Shop queryWithPassThrough(Long id){ //1.从redis查询商铺缓存 String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { //存在,返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } //判断命中是否为空值 if(shopJson!=null){ return null; } //不存在,根据id查询你数据库 Shop shop = getById(id); if (shop == null) { //将空值写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); //不存在,返回错误信息 return null; } //然后再写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); //返回 return shop; }复制一份该方法,命名为互斥锁(queryWithMutes),进行修改:
//互斥锁 public Shop queryWithMutex(Long id) { //1.从redis查询商铺缓存 String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { //存在,返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } //判断命中是否为空值 if (shopJson != null) { return null; } //实现缓存重建 //获取互斥锁 String lockKey = RedisConstants.LOCK_SHOP_KEY + id; Shop shop = null; try { boolean islock = trylock(lockKey); //失败,则休眠并重试 if (!islock) { Thread.sleep(50); return queryWithMutex(id); } //不存在,根据id查询你数据库 shop = getById(id); if (shop == null) { //将空值写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); //不存在,返回错误信息 return null; } //然后再写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { //释放互斥锁 unlock(lockKey); } //返回 return shop; }2.10 逻辑过期解决缓存击穿问题
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
思路分析:当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
如果封装数据:因为现在redis中存储的数据的value需要带上过期时间,此时要么你去修改原来的实体类,要么你新建一个实体类
步骤一、
我们采用第二个方案,这个方案,对原来代码没有侵入性。
@Data public class RedisData { private LocalDateTime expireTime; private Object data; }步骤二、
在ShopServiceImpl 新增此方法

public void saveShop2Redis(Long id,Long expiredSeconds){ //1.查询店铺数据 Shop shop=getById(id); //2.封装逻辑过期时间 RedisData redisData = new RedisData(); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expiredSeconds)); //3.写入redis stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData)); }步骤三:正式代码
ShopServiceImpl
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); public Shop queryWithLogicalExpire( Long id ) { String key = CACHE_SHOP_KEY + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isBlank(json)) { // 3.存在,直接返回 return null; } // 4.命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(json, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); // 5.判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { // 5.1.未过期,直接返回店铺信息 return shop; } // 5.2.已过期,需要缓存重建 // 6.缓存重建 // 6.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2.判断是否获取锁成功 if (isLock){ CACHE_REBUILD_EXECUTOR.submit( ()->{ try{ //重建缓存 this.saveShop2Redis(id,20L); }catch (Exception e){ throw new RuntimeException(e); }finally { unlock(lockKey); } }); } // 6.4.返回过期的商铺信息 return shop; }3. 优惠券秒杀
3.1 全局唯一ID
每个店铺都可以发布优惠券:

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

成部分:符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
3.2 Redis实现全局唯一ID
@Component public class RedisIdWorker { /** * 开始时间戳 */ private static final long BEGIN_TIMESTAMP = 1640995200L; /** * 序列号的位数 */ private static final int COUNT_BITS = 32; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public long nextId(String keyPrefix) { // 1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 2.生成序列号 // 2.1.获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 2.2.自增长 long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 3.拼接并返回 return timestamp << COUNT_BITS | count; } }测试类
知识小贴士:countdownlatch
countdownlatch:名为信号枪,主要的作用是同步协调在多线程的等待于唤醒问题
我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch
两个最重要的方法:
1、countDown
2、await
await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。
@Test void testIdWorker() throws InterruptedException { CountDownLatch latch = new CountDownLatch(300); Runnable task = () -> { for (int i = 0; i < 100; i++) { long id = redisIdWorker.nextId("order"); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }@Test导的是 import org.junit.jupiter.api.Test包
在执行测试类时需要打开虚拟机,在虚拟机上关闭防火墙,然后启动redis才可运行
systemctl stop firewalld cd /usr/local/src/redis-6.2.6 redis-server redis.conf3.3 添加优惠券
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
平价卷由于优惠力度并不是很大,所以是可以任意领取
而代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段
新增普通卷代码: VoucherController
@PostMapping public Result addVoucher(@RequestBody Voucher voucher) { voucherService.save(voucher); return Result.ok(voucher.getId()); }新增秒杀卷代码:
VoucherController
@PostMapping("seckill") public Result addSeckillVoucher(@RequestBody Voucher voucher) { voucherService.addSeckillVoucher(voucher); return Result.ok(voucher.getId()); }VoucherServiceImpl
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 保存秒杀库存到Redis中 stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }3.4 实现秒杀下单

下单核心思路:当我们点击抢购时,会触发右侧的请求,我们只需要编写对应的controller即可

秒杀下单应该思考的内容:
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。

VoucherOrderServiceImpl
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } //5,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); } //6.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 6.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 6.2.用户id Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); // 6.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }3.5 库存超卖问题分析
有关超卖问题分析:在我们原有代码中是这么写的
if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } //5,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); }假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

课程中使用的方法:
CAS法:
CAS法的核心思想是:在更新一个变量的值时,先比较当前值是否与预期值一致,如果一致则更新为新值,否则更新失败。
基本原理
CAS 操作需要三个参数:要更新的变量的内存地址、预期的旧值和要更新的新值。它执行以下步骤:比较旧值:将变量的当前值与预期的旧值进行比较。更新新值:如果当前值与预期的旧值一致,则将变量的值更新为新值。返回结果:如果更新成功,则返回true;否则,返回false。
拓展:
版本号法:
乐观锁中的版本号法是一种通过版本号来实现并发控制的方法。它的核心思想是在数据记录中增加一个版本号字段,每次更新数据时,都会检查版本号是否符合预期,如果符合则更新数据并增加版本号,否则认为发生了冲突,更新失败。
基本原理当读取数据时,会将数据中的版本号一同读取出来。当更新数据时,会带上读取到的版本号,只有当数据库中该条数据的版本号与读取到的版本号一致时,才执行更新操作,并将版本号增加 1。如果在更新数据时,发现数据库中该条数据的版本号与读取到的版本号不一致,则认为发生了冲突,更新操作失败。
两者区别:
版本号法:适用于业务层的复杂数据更新,依赖数据库或缓存中的版本号字段,适合读多写少的场景。CAS 法:适用于底层线程安全的简单变量更新,性能高,适合高并发场景,但需要解决 ABA 问题。(当一个变量的值在 CAS 操作过程中从 A 变为 B,然后又变回 A 时,CAS 操作会认为该变量的值没有发生变化,从而导致错误的更新)AtomicStampedReference :在 Java 中,AtomicStampedReference 通过引入一个版本号(时间戳)来解决 ABA 问题。每次更新变量时,不仅更新值,还更新版本号。

3.6 乐观锁解决超卖问题
方案一
优化一:VoucherOrderServiceImpl 在扣减库存时,改为:


以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败(原子性:同时成功或失败)
方案二
优化二:之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可
在方案二中,即使多个线程同时通过了库存大于 0 的检查,但由于数据库更新操作的原子性,只有第一个线程能够成功扣减库存。其他线程在尝试扣减库存时,会发现库存不足并返回失败,从而避免了超卖问题

知识小扩展:
针对cas中的自旋压力过大,我们可以使用Longaddr这个类去解决
Java8 提供的一个对AtomicLong改进后的一个类,LongAdder
大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好
所以利用这么一个类,LongAdder来进行优化
如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值

3.7 实现一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
现在的问题在于:
优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单
具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

VoucherOrderServiceImpl
初步代码:增加一人一单逻辑
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } // 5.一人一单逻辑 // 5.1.用户id Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } //6,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); } //7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }1. 并发请求
即使是一个用户,浏览器或客户端可能会因为网络延迟、页面刷新等原因发送多个请求。例如:用户点击一次按钮,但由于网络延迟或页面未及时响应,用户可能再次点击。浏览器在某些情况下可能会自动重发请求。
2. 网络延迟
在网络延迟较高的情况下,多个请求可能会几乎同时到达服务器。如果没有有效的并发控制机制,可能会导致多个请求都通过条件判断并执行下单操作。
3. 缓存问题
如果系统使用了缓存来存储用户的下单状态,但缓存未及时更新或失效,可能会导致多个请求都认为用户未下单,从而都执行了下单操作。
存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作
注意:,我们的初始方案是封装createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁(优化一)
@Transactional public synchronized Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 return Result.fail("库存不足!"); } // 7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 7.2.用户id voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 7.返回订单id return Result.ok(orderId); }什么是锁的粒度?
锁的粒度是指锁作用的范围或对象的大小。通常来说,锁的粒度可以分为以下几种:粗粒度锁:锁的范围较大,可能会锁住整个资源或对象。例如,对整个数据库表加锁,或者对整个对象加锁。粗粒度锁虽然简单,但在高并发场景下容易导致性能瓶颈,因为它限制了其他线程的访问。细粒度锁:锁的范围较小,只锁住特定的部分或特定的资源。例如,对数据库表中的某一行加锁,或者对对象中的某个特定字段加锁。细粒度锁可以提高并发性能,因为它允许更多的线程同时访问不同的部分。
修改代码:(优化二)
@Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()){ // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 return Result.fail("库存不足!"); } // 7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 7.2.用户id voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 7.返回订单id return Result.ok(orderId); } }但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:
(优化三)在seckillVoucher 中,添加以下代码,就能保证事务的特性,同时也控制了锁的粒度

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务
思考:
由于我们使用的是方法调用方法(锁的),而在相同类里方法调用方法使用的是this关键字,this代表当前类的对象(不是Spring的代理对象),而我们的事务生效是因为Spring对当前类实现了动态代理,是拿到了它的动态代理对象进行的事务管理,而现在的this调用是非代理对象不拥有事务功能(Spring事务失效的可能性之一),因此事务管理将会失效
解决:
既然是没有代理对象来调用方法,那么我们就使用代理对象来调用方法
(优化四)

最终代码
VoucherOrderServiceImpl
package com.hmdp.service.impl; import com.hmdp.dto.Result; import com.hmdp.entity.SeckillVoucher; import com.hmdp.entity.VoucherOrder; import com.hmdp.mapper.VoucherOrderMapper; import com.hmdp.service.ISeckillVoucherService; import com.hmdp.service.IVoucherOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisIdWorker; import com.hmdp.utils.UserHolder; import org.springframework.aop.framework.AopContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; /** * <p> * 服务实现类 * </p> * * @author chan * @since 2025-5-2 */ @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override public Result seckillVoucher(Long voucherId) { //1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 if(voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!"); } //3.判断秒杀是否结束 if(voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } //4.判断库存是否充足 if (voucher.getStock()<1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 return Result.fail("库存不足!"); } // 7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 7.2.用户id voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 7.返回订单id return Result.ok(orderId); } } pom.xml
<dependency> <groupId>aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.5.4</version> </dependency> <!--可加可不加--> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> </dependency> 启动类加注解
@EnableAspectJAutoProxy(exposeProxy = true)3.8 集群环境下的并发问题
有关锁失效原因分析
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

4.分布式锁
4.1 基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

那么分布式锁应该满足一些什么样的条件呢?
可见性:多个线程都能看到相同的结果
这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要有较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环

分布式锁:
1. 基于 MySQL 的分布式锁
实现原理互斥机制:利用 MySQL 的GET_LOCK和RELEASE_LOCK函数来实现互斥锁。GET_LOCK试图获取一个锁,如果获取成功则返回 1,否则返回 0。RELEASE_LOCK用于释放锁。
特点高可用性:依赖于 MySQL 数据库的高可用性,通常被认为是好的。高性能:由于数据库操作的开销相对较高,其性能一般。安全性:当客户端断开连接时,锁会自动释放,这提供了一定的安全性。
2. 基于 Redis 的分布式锁
实现原理互斥机制:利用 Redis 的SETNX命令来实现互斥锁。SETNX命令只有在键不存在时才设置键值,从而实现锁的获取。同时,可以使用EXPIRE命令为锁设置一个超时时间,以防止死锁。
特点高可用性:Redis 本身具有高可用性,通常被认为是好的。高性能:Redis 是内存数据库,操作速度快,性能好。安全性:通过设置锁的超时时间,可以自动释放锁,防止死锁。
3. 基于 Zookeeper 的分布式锁
实现原理互斥机制:Zookeeper 提供了分布式协调功能,可以利用其临时顺序节点来实现分布式锁。客户端在获取锁时创建一个临时顺序节点,只有当该节点是最小顺序节点时才认为获取锁成功。释放锁时删除该节点。
特点高可用性:Zookeeper 本身具有高可用性,通常被认为是好的。高性能:Zookeeper 的性能一般,因为它需要进行网络通信和协调多个节点。安全性:Zookeeper 的临时节点机制可以确保在客户端断开连接时自动释放锁。

4.2 Redis分布式锁的实现思路
实现分布式锁时需要实现的两个基本方法:
获取锁:互斥:确保只能有一个线程获取锁非阻塞:尝试一次,成功返回true,失败返回false释放锁:手动释放超时释放:获取锁时添加一个超时时间
核心思路:
我们利用redis 的setnx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的,等待一定时间后重试即可

4.3 实现分布式锁初级版本
锁的基本接口

SimpleRedisLock(优化五)
利用setnx方法进行加锁,同时增加过期时间,防止死锁,可保证加锁和增加过期时间具有原子性
private static final String KEY_PREFIX="lock:" @Override public boolean tryLock(long timeoutSec) { // 获取线程标示 String threadId = Thread.currentThread().getId() // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); }String threadId = ID_PREFIX + Thread.currentThread().getId();唯一标识:为每个线程生成一个唯一标识符,方便在日志、调试或分布式锁等场景中区分不同的线程。自动拆箱它是Java中的一种类型转换机制,指将包装类对象(如Integer、Boolean等)自动转换为对应的基本数据类型(如int、boolean等)的过程。与之相对的,将基本数据类型转换为包装类对象的过程称为自动装箱。自动拆箱时,如果包装类对象的值为null,则会引发空指针异常Boolean.TRUE.equals(isLock)
释放锁
public void unlock() { //通过del删除锁 stringRedisTemplate.delete(KEY_PREFIX + name); }修改业务代码
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象(新增代码) SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); //获取锁对象 boolean isLock = lock.tryLock(1200); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }4.4 Redis分布式锁误删情况说明
问题:
当线程1获取了锁后,发生业务阻塞,可能会导致业务尚未完成,锁超时释放。此时,线程2可能有机会获取这把锁并开始执行。如果原先阻塞的线程1恢复执行并试图删除锁,可能会误删线程2持有的锁,导致锁的状态混乱。
解决方案:
如果发生业务阻塞,可能会导致业务尚未完成,锁超时释放,此时其他线程可能会成功获取这把锁。当阻塞的线程1恢复执行时,它可能会尝试删除锁。如果不加判断,线程1可能会误删线程2持有的锁。为避免这种情况,可以在删除锁之前,检查锁是否仍然属于自己。例如,线程1在恢复后检查锁的归属,发现锁已不属于它,于是不执行删除操作。而线程2在执行完毕后,确认锁属于自己,再进行删除。这样可以防止误删他人锁的情况发生。

4.5 解决Redis分布式锁误删问题
需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
解决方案:


代码如下:(优化六)
import cn.hutool.core.lang.UUID; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{ private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock(Long time) { //1.设置key String key = KEY_PREFIX + name; //2.存入Redis,返回 //获取当前线程id String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, time, TimeUnit.SECONDS); return Boolean.TRUE.equals(isLock); } @Override public void unLock() { //1.设置key String key = KEY_PREFIX + name; //2.获取标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); //3.获取Redis中的标识 String id = stringRedisTemplate.opsForValue().get(key); if(threadId.equals(id)){ //标识相同,释放锁 //4.删除 stringRedisTemplate.delete(key); } } }此处的UUID是用来区别不同的JVM的,而ThreaId是用来区别同一个JVM中的不同线程的
4.6 分布式锁的原子性问题
更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的。

锁名称一样,但是锁的线程标识不一样。线程1判断后发现锁的线程标识和当前线程一样,于是根据锁名释放锁,但是业务阻塞,导致自己的锁超时释放,此时线程二开始执行,线程2拿到同样名称的锁,开始执行业务,此时线程1的阻塞解决后,立刻根据锁名称把线程2的锁误删了(可以删的原因是因为已经判断过判断一致后,未能来的及释放锁,就遭遇了阻塞,所以在阻塞立刻解决,就会滞后地执行释放锁的行为)
4.7 Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:Lua 教程 | 菜鸟教程,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。
这里重点介绍Redis提供的调用函数,语法如下:

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

用Lua脚本就会简化很多:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示 -- 获取锁中的标示,判断是否与当前线程标示一致 if (redis.call('GET', KEYS[1]) == ARGV[1]) then -- 一致,则删除锁 return redis.call('DEL', KEYS[1]) end -- 不一致,则直接返回 return 04.8 调用Lua脚本改造分布式锁
lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。
我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图

Java代码(优化七)
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } public void unlock() { // 调用lua脚本 stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } 经过以上代码改造后,我们就能够实现 拿锁比锁删锁的原子性动作了~小总结:
基于Redis的分布式锁实现思路:利用set nx ex获取锁,并设置过期时间,保存线程标示释放锁时先判断线程标示是否与自己一致,一致则删除锁特性:利用set nx满足互斥性利用set ex保证故障时锁依然能释放,避免死锁,提高安全性利用Redis集群保证高可用和高并发特性
老师总结:
我们利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题。
但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission
测试逻辑:
第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。
5.分布式锁-redission
5.1 基于setnx的问题
基于setnx实现的分布式锁存在下面的问题:
- 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
- 不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
- 超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
- 主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

5.2 Redission介绍

5.3 Redission入门
引入依赖:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>配置Redisson客户端:
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.150.101:6379") .setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); } } 如何使用Redission的分布式锁
@Resource private RedissionClient redissonClient; @Test void testRedisson() throws Exception{ //获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS); //判断获取锁成功 if(isLock){ try{ System.out.println("执行业务"); }finally{ //释放锁 lock.unlock(); } } }在 VoucherOrderServiceImpl注入RedissonClient(优化八)
@Resource private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁 //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); RLock lock = redissonClient.getLock("lock:order:" + userId); //获取锁对象 boolean isLock = lock.tryLock(); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }注意:下面皆为Redisson原理讲解部分
5.4 redission可重入锁原理
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
在redission中,我们的也支持支持可重入锁
在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式
这个地方一共有3个参数
KEYS[1] : 锁名称
ARGV[1]: 锁失效时间
ARGV[2]: id + ":" + threadId; 锁的小key
exists: 判断数据是否存在 name:是lock是否存在,如果==0,就表示当前这把锁不存在
redis.call('hset', KEYS[1], ARGV[2], 1);此时他就开始往redis里边去写数据 ,写成一个hash结构
Lock{id + ":" + threadId : 1}如果当前这把锁存在,则第一个条件不满足,再判断
redis.call('hexists', KEYS[1], ARGV[2]) == 1此时需要通过大key+小key判断当前这把锁是否是属于自己的,如果是自己的,则进行
redis.call('hincrby', KEYS[1], ARGV[2], 1)将当前这个锁的value进行+1 ,redis.call('pexpire', KEYS[1], ARGV[1]); 然后再对其设置过期时间,如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回pttl,即为当前这把锁的失效时间
如果小伙帮们看了前边的源码, 你会发现他会去判断当前这个方法的返回值是否为null,如果是null,则对应则前两个if对应的条件,退出抢锁逻辑,如果返回的不是null,即走了第三个分支,在源码处会进行while(true)的自旋抢锁。
"if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"
5.5 redission锁重试和WatchDog机制
说明:由于课程中已经说明了有关tryLock的源码解析以及其看门狗原理,所以笔者在这里给大家分析lock()方法的源码解析,希望大家在学习过程中,能够掌握更多的知识
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同
1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null
2、判断当前这把锁是否是属于当前线程,如果是,则返回null
所以如果返回是null,则代表着当前这哥们已经抢锁完毕,或者可重入完毕,但是如果以上两个条件都不满足,则进入到第三个条件,返回的是锁的失效时间,同学们可以自行往下翻一点点,你能发现有个while( true) 再次进行tryAcquire进行抢锁
long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; }接下来会有一个条件分支,因为lock方法有重载方法,一个是带参数,一个是不带参数,如果带带参数传入的值是-1,如果传入参数,则leaseTime是他本身,所以如果传入了参数,此时leaseTime != -1 则会进去抢锁,抢锁的逻辑就是之前说的那三个逻辑
if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
ttlRemainingFuture.onComplete((ttlRemaining, e) 这句话相当于对以上抢锁进行了监听,也就是说当上边抢锁完毕后,此方法会被调用,具体调用的逻辑就是去后台开启一个线程,进行续约逻辑,也就是看门狗线程
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture;此逻辑就是续约逻辑,注意看commandExecutor.getConnectionManager().newTimeout() 此方法
Method( new TimerTask() {},参数2 ,参数3 )
指的是:通过参数2,参数3 去描述什么时候去做参数1的事情,现在的情况是:10s之后去做参数一的事情
因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约
那么大家可以想一想,假设我们的线程出现了宕机他还会续约吗?当然不会,因为没有人再去调用renewExpiration这个方法,所以等到时间之后自然就释放了。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }5.6 redission锁的MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明
当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.

6. 秒杀优化
6.1 异步秒杀思路
我们来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤:
- 查询优惠卷
- 判断秒杀库存是否足够
- 查询订单
- 校验是否是一人一单
- 扣减库存
- 创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?

下单时,调用 Lua 脚本在 Redis 原子性判断库存是否充足(value>0)且用户能否下单(Set 集合无对应记录),若均满足则存 userId 和优惠卷并返回 0。若返回结果为 0,将信息存入队列,通过异步线程下单,前端依返回订单 id 判断下单是否成功。

6.2 Redis完成秒杀资格判断
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

VoucherServiceImpl
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 保存秒杀库存到Redis中 //SECKILL_STOCK_KEY 这个变量定义在RedisConstans中 //private static final String SECKILL_STOCK_KEY ="seckill:stock:" stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }完整lua表达式
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0还需要判断在查看一人一单是否要加锁吗?
在判断库存充足和用户能否下单时,使用 Lua 脚本可以确保操作的原子性,因此无需额外加锁。Lua 脚本在 Redis 中以原子方式执行,避免了因并发访问导致的一致性问题,确保库存判断和用户下单判断的准确性。
在 Redis 中,原子性操作意味着整个脚本的执行是不可分割的,其他客户端无法在脚本执行中途插入自己的操作。通过 Lua 脚本实现原子性,可以有效地保证库存的准确性以及一人一单的公平性。
在下单整个流程中,一旦 Lua 脚本判断库存充足且用户可以下单,会将订单信息存入队列,并返回成功的信号,再由异步线程处理后续的下单操作。前端根据返回的订单 ID 判断下单是否成功,从而为用户提供人生动的反馈。
如果库存不足,Lua 脚本会如何处理?
在使用 Lua 脚本判断库存和下单逻辑时,如果库存不足,Lua 脚本会直接结束操作并返回相应的结果,例如返回一个表示库存不足的特定值(如 1 或其他标识符)。前端或后端服务根据返回值判断下单是否成功。
VoucherOrderServiceImpl
@Override public Result seckillVoucher(Long voucherId) { //获取用户 Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } //TODO 保存阻塞队列 // 3.返回订单id return Result.ok(orderId); }6.3 基于阻塞队列实现秒杀优化
VoucherOrderServiceImpl
现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑存到队列中,然后异步执行。
package com.hmdp.service.impl; import com.hmdp.dto.Result; import com.hmdp.entity.VoucherOrder; import com.hmdp.mapper.VoucherOrderMapper; import com.hmdp.service.ISeckillVoucherService; import com.hmdp.service.IVoucherOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisIdWorker; import com.hmdp.utils.UserHolder; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.aop.framework.AopContext; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Resource private StringRedisTemplate stringRedisTemplate; private IVoucherOrderService proxy; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //异步处理线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } // 用于线程池处理的任务 // 当初始化完毕后,就会去从对列中去拿信息 private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 2.创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常", e); } } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { //1.获取用户 Long userId = voucherOrder.getUserId(); // 2.创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 3.尝试获取锁 boolean isLock = redisLock.tryLock(); // 4.判断是否获得锁成功 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效 proxy.createVoucherOrder(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } VoucherOrder voucherOrder = new VoucherOrder(); // 2.3.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4.用户id voucherOrder.setUserId(userId); // 2.5.代金券id voucherOrder.setVoucherId(voucherId); // 2.6.放入阻塞队列 orderTasks.add(voucherOrder); //3.获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); //4.返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); save(voucherOrder); } } 小总结:
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
为什么异步快?同步与异步的区别
同步(Synchronous)
- 定义 :同步操作是指调用方发起一个操作后,需要等待该操作完成才能继续执行后续代码。在这种模式下,任务是按顺序、阻塞式地执行的。
- 特点 :
- 阻塞式 :调用方在等待操作结果期间会被阻塞,无法进行其他操作,这可能导致资源浪费,尤其是在执行耗时操作时。
- 顺序执行 :任务按照代码的编写顺序依次执行,前一个任务不完成,后一个任务就不能开始。
异步(Asynchronous)
- 定义 :异步操作是指调用方发起一个操作后,不需要等待该操作完成,可以继续执行其他代码。操作完成后,系统会通过某种方式(如回调函数、事件、消息队列等)通知调用方。
- 特点 :
- 非阻塞式 :调用方在发起操作后可以继续执行其他任务,不会被阻塞,从而提高了资源利用率和系统的并发处理能力。
- 并发执行 :多个任务可以同时进行,不需要严格按照顺序等待前一个任务完成,这使得系统能够更高效地处理多个请求。
- 复杂性较高 :由于任务的执行顺序不确定,代码的逻辑会变得相对复杂,增加了理解和调试的难度。需要使用回调函数、Promise、async/await 等机制来处理操作完成后的结果。
| 对比维度 | 同步 | 异步 |
|---|---|---|
| 执行方式 | 阻塞式,按顺序依次执行 | 非阻塞式,可并发执行 |
| 资源利用率 | 较低,调用方等待期间资源闲置 | 较高,能充分利用系统资源 |
| 代码复杂度 | 简单直观,易于理解和调试 | 复杂,需额外处理异步逻辑 |
| 适用场景 | 实时性要求高、任务执行时间短的场景 | 耗时操作,如网络请求、文件操作等 |
| 响应速度 | 较慢,因需等待每个操作完成 | 较快,调用方无需等待操作完成即可响应用户 |
7. Redis消息队列
7.1 认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

使用队列的好处在于 解耦:
用户下单后,先用 Redis 校验下单条件,若满足,就将下单请求以消息形式发送到队列。而后启动独立线程消费队列中的消息,完成后续下单流程。这样,下单请求的接收与处理不再紧密耦合,前端无需等待整个下单流程结束即可返回响应,从而加速响应速度。
7.2 基于List实现消息队列
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

基于List的消息队列有哪些优缺点? 优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
无法避免消息丢失消息在消费过程中丢失 :在消费者从 List 中取出消息并开始处理时,如果消费者出现故障(如进程挂掉、网络中断等),而此时消息尚未被确认消费完成(如 Redis 的阻塞弹出命令如 BRPOP 不支持消息确认机制),那么这条消息就会丢失,因为 Redis 认为已经成功交付给消费者了,不会再保留这条消息。Redis 主从复制场景下的丢失风险 :在使用 Redis 主从复制架构时,如果主节点出现故障,从节点进行故障转移成为新的主节点。在这个过程中,可能存在主节点上已写入但未同步到从节点的消息丢失的情况,导致 List 中的消息不完整。
只支持单消费者并发消费问题 :当有多个消费者同时尝试从 Redis List 中消费消息时,Redis 无法像一些专业的消息队列(如 RabbitMQ 的订阅模式)那样将消息公平地分配给不同的消费者。此时,多个消费者会竞争消费同一条消息,容易出现消息被重复消费或者部分消费者长时间消费不到消息的情况,无法实现高效的并发消费,不能很好地满足需要多个消费者共同处理大量消息的场景
7.3 基于PubSub的消息队列
在 Redis 2.0 版本中引入的 Pub/Sub(发布订阅)模型,为消息传递提供了一种高效且灵活的机制。该模型允许消费者订阅一个或多个频道(channel),而生产者向指定频道发送消息时,所有订阅了该频道的消费者都能及时收到对应消息。
以下是 Pub/Sub 模型的常用操作命令:订阅频道 :通过SUBSCRIBE channel [channel ...]命令,消费者可以订阅一个或多个频道。一旦订阅完成,消费者将开始接收这些频道上的所有消息。发布消息 :生产者使用PUBLISH channel message命令,向指定的频道发送消息。消息会广播给所有订阅了该频道的消费者,从而实现一对多的消息传播。模式匹配订阅 :消费者还可以借助PSUBSCRIBE pattern [pattern ...]命令,订阅符合特定模式的频道。这样,对于符合模式的多个频道,消费者都能接收其消息,进一步增强了订阅的灵活性。

基于PubSub的消息队列有哪些优缺点? 优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
不支持数据持久化Redis 的 Pub/Sub 消息队列本身不支持消息的持久化存储。一旦 Redis 服务重启或出现故障,之前发布的消息将丢失,无法在服务恢复后重新获取和处理这些消息,这对需要保证消息可靠性的应用不太友好。无法避免消息丢失消费者在订阅频道后,若在消息发布期间出现故障(如网络中断、进程崩溃等),可能会错过部分消息。而且,如果消费者在未订阅频道时,生产者向该频道发送消息,消费者也无法获取这些消息,导致消息丢失。消息堆积有上限,超出时数据丢失由于 Redis 的 Pub/Sub 模型主要设计用于实时消息传递,它没有内置的消息堆积和存储机制。当消息生产速度远大于消费速度时,未被及时消费的消息无法在 Redis 中堆积存储,超出处理能力的消息将被丢弃,无法保证消息的完整性和可靠性。
7.4 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,代码如下:
while (true) { // 尝试读取队列中的消息,最多阻塞2秒 Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $"); if (msg == null) { continue; } // 处理消息 handleMessage(msg); }XREAD 命令的特点消息可回溯 :Redis Stream 可以保留消息的历史记录,通过指定不同的起始 ID,可以回溯读取之前的消息。例如可以指定一个较早的消息 ID 来重新消费历史消息。一个消息可以被多个消费者读取 :与传统的队列不同,在 Redis Stream 中,消息不会因为一个消费者读取而消失,其他消费者仍然可以读取同一条消息。这适用于需要多个消费者对消息进行不同处理的场景。可以阻塞读取 :使用BLOCK参数可以让读取操作在消息队列为空时阻塞,等待一段时间直到有新消息到达。这种特性可以减少轮询的频率,提高资源利用率。有消息漏读的风险 :当起始 ID 设置为$时,每次读取最新消息。如果在处理某条消息的过程中,又有超过一条以上的新消息到达队列,下一次读取时,只能获取到最新的一条消息,中间的其他消息可能被漏读。例如,假设队列中有消息 A、B、C,先读取到 A,正在处理 A 时,B 和 C 到达队列。当再次读取时,由于起始 ID 是$,会直接读取到 C,而 B 就会被漏读。
需要注意的是,Redis Stream 提供了更多的功能来应对消息漏读等问题,比如使用消费者组可以更好地管理消息的消费过程,包括消息的确认、重试等机制。
7.5 消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具备下列特点:



消费者监听消息的基本思路:
while (true) { // 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒 Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >"); if (msg == null) { // null 说明没有消息,继续下一次 continue; } try { // 处理消息,完成后一定要 ACK handleMessage(msg); } catch (Exception e) { while (true) { Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0"); if (msg == null) { // null 说明没有异常消息,所有消息都已确认,结束循环 break; } try { // 说明有异常消息,再次处理 handleMessage(msg); } catch (Exception e) { // 再次出现异常,记录日志,继续循环 continue; } } } }STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
最后我们来个小对比

7.6 消息队列实现异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
修改lua表达式,新增--3.6

VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); //处理异常消息 handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有异常消息,结束循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理pendding订单异常", e); try{ Thread.sleep(20); }catch(Exception e){ e.printStackTrace(); } } } } } 8. 秒杀优化
8.1 异步秒杀思路
我们来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤:
- 查询优惠卷
- 判断秒杀库存是否足够
- 查询订单
- 校验是否是一人一单
- 扣减库存
- 创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?

下单时,调用 Lua 脚本在 Redis 原子性判断库存是否充足(value>0)且用户能否下单(Set 集合无对应记录),若均满足则存 userId 和优惠卷并返回 0。若返回结果为 0,将信息存入队列,通过异步线程下单,前端依返回订单 id 判断下单是否成功。

8.2 Redis完成秒杀资格判断
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

VoucherServiceImpl
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 保存秒杀库存到Redis中 //SECKILL_STOCK_KEY 这个变量定义在RedisConstans中 //private static final String SECKILL_STOCK_KEY ="seckill:stock:" stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }完整lua表达式
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0还需要判断在查看一人一单是否要加锁吗?
在判断库存充足和用户能否下单时,使用 Lua 脚本可以确保操作的原子性,因此无需额外加锁。Lua 脚本在 Redis 中以原子方式执行,避免了因并发访问导致的一致性问题,确保库存判断和用户下单判断的准确性。
在 Redis 中,原子性操作意味着整个脚本的执行是不可分割的,其他客户端无法在脚本执行中途插入自己的操作。通过 Lua 脚本实现原子性,可以有效地保证库存的准确性以及一人一单的公平性。
在下单整个流程中,一旦 Lua 脚本判断库存充足且用户可以下单,会将订单信息存入队列,并返回成功的信号,再由异步线程处理后续的下单操作。前端根据返回的订单 ID 判断下单是否成功,从而为用户提供人生动的反馈。
如果库存不足,Lua 脚本会如何处理?
在使用 Lua 脚本判断库存和下单逻辑时,如果库存不足,Lua 脚本会直接结束操作并返回相应的结果,例如返回一个表示库存不足的特定值(如 1 或其他标识符)。前端或后端服务根据返回值判断下单是否成功。
VoucherOrderServiceImpl
@Override public Result seckillVoucher(Long voucherId) { //获取用户 Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } //TODO 保存阻塞队列 // 3.返回订单id return Result.ok(orderId); }8.3 基于阻塞队列实现秒杀优化
VoucherOrderServiceImpl
现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑存到队列中,然后异步执行。
package com.hmdp.service.impl; import com.hmdp.dto.Result; import com.hmdp.entity.VoucherOrder; import com.hmdp.mapper.VoucherOrderMapper; import com.hmdp.service.ISeckillVoucherService; import com.hmdp.service.IVoucherOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisIdWorker; import com.hmdp.utils.UserHolder; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.aop.framework.AopContext; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Resource private StringRedisTemplate stringRedisTemplate; private IVoucherOrderService proxy; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //异步处理线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } // 用于线程池处理的任务 // 当初始化完毕后,就会去从对列中去拿信息 private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 2.创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常", e); } } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { //1.获取用户 Long userId = voucherOrder.getUserId(); // 2.创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 3.尝试获取锁 boolean isLock = redisLock.tryLock(); // 4.判断是否获得锁成功 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效 proxy.createVoucherOrder(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } VoucherOrder voucherOrder = new VoucherOrder(); // 2.3.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4.用户id voucherOrder.setUserId(userId); // 2.5.代金券id voucherOrder.setVoucherId(voucherId); // 2.6.放入阻塞队列 orderTasks.add(voucherOrder); //3.获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); //4.返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); save(voucherOrder); } } 小总结:
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
为什么异步快?同步与异步的区别
同步(Synchronous)
- 定义 :同步操作是指调用方发起一个操作后,需要等待该操作完成才能继续执行后续代码。在这种模式下,任务是按顺序、阻塞式地执行的。
- 特点 :
- 阻塞式 :调用方在等待操作结果期间会被阻塞,无法进行其他操作,这可能导致资源浪费,尤其是在执行耗时操作时。
- 顺序执行 :任务按照代码的编写顺序依次执行,前一个任务不完成,后一个任务就不能开始。
异步(Asynchronous)
- 定义 :异步操作是指调用方发起一个操作后,不需要等待该操作完成,可以继续执行其他代码。操作完成后,系统会通过某种方式(如回调函数、事件、消息队列等)通知调用方。
- 特点 :
- 非阻塞式 :调用方在发起操作后可以继续执行其他任务,不会被阻塞,从而提高了资源利用率和系统的并发处理能力。
- 并发执行 :多个任务可以同时进行,不需要严格按照顺序等待前一个任务完成,这使得系统能够更高效地处理多个请求。
- 复杂性较高 :由于任务的执行顺序不确定,代码的逻辑会变得相对复杂,增加了理解和调试的难度。需要使用回调函数、Promise、async/await 等机制来处理操作完成后的结果。
| 对比维度 | 同步 | 异步 |
|---|---|---|
| 执行方式 | 阻塞式,按顺序依次执行 | 非阻塞式,可并发执行 |
| 资源利用率 | 较低,调用方等待期间资源闲置 | 较高,能充分利用系统资源 |
| 代码复杂度 | 简单直观,易于理解和调试 | 复杂,需额外处理异步逻辑 |
| 适用场景 | 实时性要求高、任务执行时间短的场景 | 耗时操作,如网络请求、文件操作等 |
| 响应速度 | 较慢,因需等待每个操作完成 | 较快,调用方无需等待操作完成即可响应用户 |
9. Redis消息队列
9.1 认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

使用队列的好处在于 解耦:
用户下单后,先用 Redis 校验下单条件,若满足,就将下单请求以消息形式发送到队列。而后启动独立线程消费队列中的消息,完成后续下单流程。这样,下单请求的接收与处理不再紧密耦合,前端无需等待整个下单流程结束即可返回响应,从而加速响应速度。
9.2 基于List实现消息队列
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

基于List的消息队列有哪些优缺点? 优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
无法避免消息丢失消息在消费过程中丢失 :在消费者从 List 中取出消息并开始处理时,如果消费者出现故障(如进程挂掉、网络中断等),而此时消息尚未被确认消费完成(如 Redis 的阻塞弹出命令如 BRPOP 不支持消息确认机制),那么这条消息就会丢失,因为 Redis 认为已经成功交付给消费者了,不会再保留这条消息。Redis 主从复制场景下的丢失风险 :在使用 Redis 主从复制架构时,如果主节点出现故障,从节点进行故障转移成为新的主节点。在这个过程中,可能存在主节点上已写入但未同步到从节点的消息丢失的情况,导致 List 中的消息不完整。
只支持单消费者并发消费问题 :当有多个消费者同时尝试从 Redis List 中消费消息时,Redis 无法像一些专业的消息队列(如 RabbitMQ 的订阅模式)那样将消息公平地分配给不同的消费者。此时,多个消费者会竞争消费同一条消息,容易出现消息被重复消费或者部分消费者长时间消费不到消息的情况,无法实现高效的并发消费,不能很好地满足需要多个消费者共同处理大量消息的场景
9.3 基于PubSub的消息队列
在 Redis 2.0 版本中引入的 Pub/Sub(发布订阅)模型,为消息传递提供了一种高效且灵活的机制。该模型允许消费者订阅一个或多个频道(channel),而生产者向指定频道发送消息时,所有订阅了该频道的消费者都能及时收到对应消息。
以下是 Pub/Sub 模型的常用操作命令:订阅频道 :通过SUBSCRIBE channel [channel ...]命令,消费者可以订阅一个或多个频道。一旦订阅完成,消费者将开始接收这些频道上的所有消息。发布消息 :生产者使用PUBLISH channel message命令,向指定的频道发送消息。消息会广播给所有订阅了该频道的消费者,从而实现一对多的消息传播。模式匹配订阅 :消费者还可以借助PSUBSCRIBE pattern [pattern ...]命令,订阅符合特定模式的频道。这样,对于符合模式的多个频道,消费者都能接收其消息,进一步增强了订阅的灵活性。

基于PubSub的消息队列有哪些优缺点? 优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
不支持数据持久化Redis 的 Pub/Sub 消息队列本身不支持消息的持久化存储。一旦 Redis 服务重启或出现故障,之前发布的消息将丢失,无法在服务恢复后重新获取和处理这些消息,这对需要保证消息可靠性的应用不太友好。无法避免消息丢失消费者在订阅频道后,若在消息发布期间出现故障(如网络中断、进程崩溃等),可能会错过部分消息。而且,如果消费者在未订阅频道时,生产者向该频道发送消息,消费者也无法获取这些消息,导致消息丢失。消息堆积有上限,超出时数据丢失由于 Redis 的 Pub/Sub 模型主要设计用于实时消息传递,它没有内置的消息堆积和存储机制。当消息生产速度远大于消费速度时,未被及时消费的消息无法在 Redis 中堆积存储,超出处理能力的消息将被丢弃,无法保证消息的完整性和可靠性。
9.4 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,代码如下:
while (true) { // 尝试读取队列中的消息,最多阻塞2秒 Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $"); if (msg == null) { continue; } // 处理消息 handleMessage(msg); }XREAD 命令的特点消息可回溯 :Redis Stream 可以保留消息的历史记录,通过指定不同的起始 ID,可以回溯读取之前的消息。例如可以指定一个较早的消息 ID 来重新消费历史消息。一个消息可以被多个消费者读取 :与传统的队列不同,在 Redis Stream 中,消息不会因为一个消费者读取而消失,其他消费者仍然可以读取同一条消息。这适用于需要多个消费者对消息进行不同处理的场景。可以阻塞读取 :使用BLOCK参数可以让读取操作在消息队列为空时阻塞,等待一段时间直到有新消息到达。这种特性可以减少轮询的频率,提高资源利用率。有消息漏读的风险 :当起始 ID 设置为$时,每次读取最新消息。如果在处理某条消息的过程中,又有超过一条以上的新消息到达队列,下一次读取时,只能获取到最新的一条消息,中间的其他消息可能被漏读。例如,假设队列中有消息 A、B、C,先读取到 A,正在处理 A 时,B 和 C 到达队列。当再次读取时,由于起始 ID 是$,会直接读取到 C,而 B 就会被漏读。
需要注意的是,Redis Stream 提供了更多的功能来应对消息漏读等问题,比如使用消费者组可以更好地管理消息的消费过程,包括消息的确认、重试等机制。
9.5 消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具备下列特点:



消费者监听消息的基本思路:
while (true) { // 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒 Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >"); if (msg == null) { // null 说明没有消息,继续下一次 continue; } try { // 处理消息,完成后一定要 ACK handleMessage(msg); } catch (Exception e) { while (true) { Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0"); if (msg == null) { // null 说明没有异常消息,所有消息都已确认,结束循环 break; } try { // 说明有异常消息,再次处理 handleMessage(msg); } catch (Exception e) { // 再次出现异常,记录日志,继续循环 continue; } } } }STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
最后我们来个小对比

9.6 消息队列实现异步秒杀
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
修改lua表达式,新增--3.6

VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); //处理异常消息 handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有异常消息,结束循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理pendding订单异常", e); try{ Thread.sleep(20); }catch(Exception e){ e.printStackTrace(); } } } } } 10. Redis最佳实践
10.1 Redis键值设计
优雅的key结构
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
例如:我们的登录业务,保存用户信息,其key可以设计成如下格式:

这样设计的好处:
- 可读性强
- 避免key冲突
- 方便管理
- 更节省内存
Redis 的 String 类型键值,其底层根据值的大小和内容采用三种编码格式:int、embstr 和 raw。
embstr 编码: 当字符串值长度小于等于 44 字节时使用。其最大优势在于 Redis Object (robj) 和对应的 SDS (Simple Dynamic String) 结构体被分配在一块连续的内存空间中。这种布局不仅内存占用更小(减少了单独分配 SDS 头的开销),而且得益于内存连续性,数据访问效率更高。
raw 编码: 当字符串值长度超过 44 字节时,会自动转换为 raw 编码。在此编码下,Redis Object 和 SDS 结构体会被分配在两块独立的内存空间中。Redis Object 内部仅存储一个指向 SDS 内存块的指针。这种非连续存储方式带来了两方面影响:
- 访问性能降低: 读取数据需要额外的指针跳转(两次内存访问),不如
embstr直接。 - 潜在内存碎片: SDS 空间单独分配,增大了内存分配器的管理负担,易产生内存碎片。


拒绝BigKey
BigKey通常以Key的大小和Key中成员的数量来综合判定,例如:
- Key本身的数据量过大:一个String类型的Key,它的值为5 MB
- Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个
- Key中成员的数据量过大:一个Hash类型的Key,其成员的Value(值)总大小为100 MB
推荐值:
- 单个key的value小于10KB
- 对于集合类型的key,建议元素数量小于1000

BigKey危害

如何发现BigKey

①redis-cli --bigkeys
利用redis-cli提供的--bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的big key
#命令 redis-cli -a 密码 --bigkeys
②scan扫描
自己编程,利用scan扫描Redis中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)


import com.heima.jedis.util.JedisConnectionFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.ScanResult; import java.util.HashMap; import java.util.List; import java.util.Map; public class JedisTest { private Jedis jedis; @BeforeEach void setUp() { // 1.建立连接 // jedis = new Jedis("192.168.150.101", 6379); jedis = JedisConnectionFactory.getJedis(); // 2.设置密码 jedis.auth("123321"); // 3.选择库 jedis.select(0); } final static int STR_MAX_LEN = 10 * 1024; final static int HASH_MAX_LEN = 500; @Test void testScan() { int maxLen = 0; long len = 0; String cursor = "0"; do { // 扫描并获取一部分key ScanResult<String> result = jedis.scan(cursor); // 记录cursor cursor = result.getCursor(); List<String> list = result.getResult(); if (list == null || list.isEmpty()) { break; } // 遍历 for (String key : list) { // 判断key的类型 String type = jedis.type(key); switch (type) { case "string": len = jedis.strlen(key); maxLen = STR_MAX_LEN; break; case "hash": len = jedis.hlen(key); maxLen = HASH_MAX_LEN; break; case "list": len = jedis.llen(key); maxLen = HASH_MAX_LEN; break; case "set": len = jedis.scard(key); maxLen = HASH_MAX_LEN; break; case "zset": len = jedis.zcard(key); maxLen = HASH_MAX_LEN; break; default: break; } //打印BigKey if (len >= maxLen) { System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len); } } } while (!cursor.equals("0")); } @AfterEach void tearDown() { if (jedis != null) { jedis.close(); } } }③第三方工具
- 利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况
- https://github.com/sripathikrishnan/redis-rdb-tools
④网络监控
- 自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
- 一般阿里云搭建的云服务器就有相关监控页面
如何删除BigKey
BigKey内存占用较多,即便时删除这样的key也需要耗费很长时间,导致Redis主线程阻塞,引发一系列问题。
- redis 3.0 及以下版本:
- 如果是集合类型,则遍历BigKey的元素,先逐个删除子元素,最后删除BigKey
- Redis 4.0以后:
- Redis在4.0后提供了异步删除的命令:unlink
恰当的数据类型
例1:比如存储一个User对象,我们有三种存储方式:
①方式一:json字符串
{user:1 | {"name": "Jack", "age": 21}}优点:实现简单粗暴
缺点:数据耦合,不够灵活
②方式二:字段打散
| user:1:name | Jack |
|---|---|
| user:1:age | 21 |
优点:可以灵活访问对象任意字段
缺点:占用空间大、没办法做统一控制
③方式三:hash(推荐)
| user:1 | name | jack |
|---|---|---|
| age | 21 |
优点:底层使用ziplist,空间占用小,可以灵活访问对象的任意字段
缺点:代码相对复杂
例2:假如有hash类型的key,其中有100万对field和value,field是自增id,这个key存在什么问题?如何优化?
| key | field | value |
|---|---|---|
| someKey | id:0 | value0 |
| ..... | ..... | |
| id:999999 | value999999 |
存在的问题:
- hash的entry数量超过500时,会使用哈希表而不是ZipList,内存占用较多
- 可以通过hash-max-ziplist-entries配置entry上限,但是如果entry过多就会导致BigKey问题

方案一:拆分为string类型
| key | value |
|---|---|
| id:0 | value0 |
| ..... | ..... |
| id:999999 | value999999 |
存在的问题:
- string结构底层没有太多内存优化,内存占用较多
- 想要批量获取这些数据比较麻烦
方案二:拆分为小的hash,将 id / 100 作为key, 将id % 100 作为field
| key | field | value |
|---|---|---|
| key:0 | id:00 | value0 |
| ..... | ..... | |
| id:99 | value99 | |
| key:1 | id:00 | value100 |
| ..... | ..... | |
| id:99 | value199 | |
| .... | ||
| key:9999 | id:00 | value999900 |
| ..... | ..... | |
| id:99 | value999999 |
代码实现:
package com.heima.test; import com.heima.jedis.util.JedisConnectionFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.ScanResult; import java.util.HashMap; import java.util.List; import java.util.Map; public class JedisTest { private Jedis jedis; @BeforeEach void setUp() { // 1.建立连接 // jedis = new Jedis("192.168.150.101", 6379); jedis = JedisConnectionFactory.getJedis(); // 2.设置密码 jedis.auth("123321"); // 3.选择库 jedis.select(0); } @Test void testSetBigKey() { Map<String, String> map = new HashMap<>(); for (int i = 1; i <= 650; i++) { map.put("hello_" + i, "world!"); } jedis.hmset("m2", map); } @Test void testBigHash() { Map<String, String> map = new HashMap<>(); for (int i = 1; i <= 100000; i++) { map.put("key_" + i, "value_" + i); } jedis.hmset("test:big:hash", map); } @Test void testBigString() { for (int i = 1; i <= 100000; i++) { jedis.set("test:str:key_" + i, "value_" + i); } } @Test void testSmallHash() { int hashSize = 100; Map<String, String> map = new HashMap<>(hashSize); for (int i = 1; i <= 100000; i++) { int k = (i - 1) / hashSize; int v = i % hashSize; map.put("key_" + v, "value_" + v); if (v == 0) { jedis.hmset("test:small:hash_" + k, map); } } } @AfterEach void tearDown() { if (jedis != null) { jedis.close(); } } }10.2 批处理优化
Pipeline
单个命令的执行流程

N条命令的执行流程

多条指令批量的传输流程


MSet
利用mset批量插入10万条数据
@Test void testMxx() { String[] arr = new String[2000]; int j; long b = System.currentTimeMillis(); for (int i = 1; i <= 100000; i++) { j = (i % 1000) << 1; arr[j] = "test:key_" + i; arr[j + 1] = "value_" + i; if (j == 0) { jedis.mset(arr); } } long e = System.currentTimeMillis(); System.out.println("time: " + (e - b)); }注意:不要在一次处理中传输太多命令,否则单次命令占用宽带过多,会导致网络阻塞
Pipeline
MSET虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用Pipeline
@Test void testPipeline() { // 创建管道 Pipeline pipeline = jedis.pipelined(); long b = System.currentTimeMillis(); for (int i = 1; i <= 100000; i++) { // 放入命令到管道 pipeline.set("test:key_" + i, "value_" + i); if (i % 1000 == 0) { // 每放入1000条命令,批量执行 pipeline.sync(); } } long e = System.currentTimeMillis(); System.out.println("time: " + (e - b)); }集群下的批处理
Redis 集群要求批处理命令(如 MSET 或包含多 Key 的 Pipeline)中的所有 Key 必须位于同一个哈希槽 (Slot),否则会执行失败。这在实际应用中是个挑战,因为批处理的数据 Key 天然可能分布在不同的 Slot 和节点上。
针对此限制,主要有四种解决方案:

- 串行单命令执行:
- 做法: 将批处理拆分成独立的单条命令,依次发送执行。
- 优点: 实现极其简单。
- 缺点: 网络往返次数最多,耗时最长,性能最差,通常没有实用价值。
- 按 Slot 分组串行执行:
- 做法: 客户端预先计算每个 Key 所属的 Slot。将相同 Slot 的 Key 分组。对每个分组分别执行 Pipeline 批处理。各组命令串行执行(即一组执行完再执行下一组)。
- 优点: 比方案 1 大幅减少网络往返次数(每组内 Pipeline 优化),耗时显著降低。
- 缺点: 实现比方案 1 复杂。不同 Slot 组的命令仍需等待,存在串行等待时间,非最优。
- 按 Slot 分组并行执行:
- 做法: 同样是预先按 Slot 分组 Key。关键区别在于,不同 Slot 组的 Pipeline 命令是并行发送执行的(例如利用多线程或异步 I/O)。
- 优点: 在方案 2 的基础上,消除了不同 Slot 组间的串行等待时间,耗时最短(仅受最慢的那个 Slot 组执行时间影响)。
- 缺点: 实现最复杂,需要客户端管理并发和结果聚合。
- 使用 Hash Tag:
- 做法: 设计 Key 时加入
{hashtag}。Redis 集群仅根据{和}之间的字符串计算 Slot。确保相关 Key 使用相同的{hashtag}部分,它们就会落在同一个 Slot。 - 优点: 实现相对简单(主要在 Key 设计)。一次 Pipeline 即可完成所有操作,网络开销最小,耗时最低。
- 缺点:强依赖 Key 设计。如果大量 Key 使用相同的
{hashtag},会导致数据分布不均(数据倾斜/热点问题),影响集群扩展性和性能。
- 做法: 设计 Key 时加入
总结与推荐:
- 方案 4 (Hash Tag) 性能最优且实现较简单,是首选方案,但务必谨慎设计
{hashtag}以避免严重的数据倾斜(例如,使用用户 ID 后几位作为 tag 的一部分)。 - 方案 3 (按 Slot 分组并行执行) 在无法使用 Hash Tag 或必须避免潜在倾斜风险时,是实现高性能批处理的推荐方案。它克服了方案 2 的串行瓶颈,性能接近方案 4。(最为推荐)
- 方案 2 (按 Slot 分组串行执行) 是方案 3 的简化版,在并行实现困难时可作为过渡方案,但其串行特性限制了性能上限。
- 方案 1 (串行单命令) 性能过低,不推荐。
串行化执行代码实践
public class JedisClusterTest { private JedisCluster jedisCluster; @BeforeEach void setUp() { // 配置连接池 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(8); poolConfig.setMaxIdle(8); poolConfig.setMinIdle(0); poolConfig.setMaxWaitMillis(1000); HashSet<HostAndPort> nodes = new HashSet<>(); nodes.add(new HostAndPort("192.168.150.101", 7001)); nodes.add(new HostAndPort("192.168.150.101", 7002)); nodes.add(new HostAndPort("192.168.150.101", 7003)); nodes.add(new HostAndPort("192.168.150.101", 8001)); nodes.add(new HostAndPort("192.168.150.101", 8002)); nodes.add(new HostAndPort("192.168.150.101", 8003)); jedisCluster = new JedisCluster(nodes, poolConfig); } @Test void testMSet() { jedisCluster.mset("name", "Jack", "age", "21", "sex", "male"); } @Test void testMSet2() { Map<String, String> map = new HashMap<>(3); map.put("name", "Jack"); map.put("age", "21"); map.put("sex", "Male"); //对Map数据进行分组。根据相同的slot放在一个分组 //key就是slot,value就是一个组 Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet() .stream() .collect(Collectors.groupingBy( entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())) ); //串行的去执行mset的逻辑 for (List<Map.Entry<String, String>> list : result.values()) { String[] arr = new String[list.size() * 2]; int j = 0; for (int i = 0; i < list.size(); i++) { j = i<<2; Map.Entry<String, String> e = list.get(0); arr[j] = e.getKey(); arr[j + 1] = e.getValue(); } jedisCluster.mset(arr); } } @AfterEach void tearDown() { if (jedisCluster != null) { jedisCluster.close(); } } }Spring集群环境下批处理代码
@Test void testMSetInCluster() { Map<String, String> map = new HashMap<>(3); map.put("name", "Rose"); map.put("age", "21"); map.put("sex", "Female"); stringRedisTemplate.opsForValue().multiSet(map); List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex")); strings.forEach(System.out::println); }原理分析
在 RedisAdvancedClusterAsyncCommandsImpl 类中的核心流程:
- 按 Slot 分组数据 根据 key 的哈希槽(slot)将数据分区到
Map<Integer, ...>中:- Key → 目标 slot 值
- Value → 相同 slot 的所有键值对集合
- 异步批量操作 通过
RedisFuture<String> mset = super.mset(op)将分组后的数据异步发送到对应 slot 所在的集群节点执行。
10.3 服务器端优化-持久化配置


编辑
10.4 服务器端优化-慢查询优化
什么是慢查询

慢查询的危害:由于Redis是单线程的,所以当客户端发出指令后,他们都会进入到redis底层的queue来执行,如果此时有一些慢查询的数据,就会导致大量请求阻塞,从而引起报错,所以我们需要解决慢查询问题。
慢查询的阈值可以通过配置指定:
slowlog-log-slower-than:慢查询阈值,单位是微秒。默认是10000,建议1000
slowlog-max-len:慢查询日志(本质是一个队列)的长度。默认是128,建议1000
慢查询会被放入慢查询日志中,日志的长度有上限,可以通过配置指定:

如何查看慢查询
知道了以上内容之后,那么咱们如何去查看慢查询日志列表呢:
- slowlog len:查询慢查询日志长度
- slowlog get [n]:读取n条慢查询日志
- slowlog reset:清空慢查询列表

也可使用redis客户端进行查看

10.5 服务器端优化-命令及安全配置
安全可以说是服务器端一个非常重要的话题,如果安全出现了问题,那么一旦这个漏洞被一些坏人知道了之后,并且进行攻击,那么这就会给咱们的系统带来很多的损失,所以我们这节课就来解决这个问题。



10.6 服务器端优化-Redis内存划分和内存配置



接下来我们看到了这些配置,最关键的缓存区内存如何定位和解决呢?

10.7 服务器端集群优化-集群还是主从

问题1:集群完整性问题


问题2:集群带宽问题

那我们到底是集群还是主从
单体Redis(主从Redis)已经能达到万级别的QPS,并且也具备很强的高可用特性。如果主从能满足业务需求的情况下,所以如果不是在万不得已的情况下,尽量不搭建Redis集群
11 面试答案合集
【紫色加粗字体为重点】 【红色加粗字体为易错点】
实验室基地面试题答案
1. Redis 为什么这么快?【实验室基地面试题第1题 和 追势科技公司Redis相关面试题第15题】
基于内存存储:Redis 的数据主要存储在内存中,内存读写速度远快于磁盘。(核心原因)
单线程:避免多线程切换带来的上下文开销,同时也避免线程安全问题,减少了锁竞争等额外消耗
高效的数据结构:针对不同场景设计优化的数据结构(如哈希表用 HASH ),操作效率高。
IO 多路复用:采用 epoll 等 IO 多路复用技术,能高效处理大量并发连接,提升 IO 处理能力。
精简的命令处理:命令处理流程简洁,避免了复杂的逻辑开销,且大部分命令操作都是原子性的。
2. Redisson 实现的分布式锁是通过什么机制防止锁超时的?
Redisson 通过看门狗机制(Watch Dog) 防止锁超时。当线程获取锁后,若业务未执行完毕,看门狗会定期(默认每隔 30 秒)自动延长锁的过期时间(默认过期时间 30 秒),确保锁不会因业务执行时间过长而提前释放,避免其他线程误获取锁。
3. Redisson 中 "看门狗机制" 的原理、作用及实现方式
原理:线程获取锁成功后,Redisson 会启动后台定时任务(看门狗)。若锁未被释放且未超时,看门狗会每隔一定时间(默认 10 秒,为锁过期时间的 1/3)重置锁过期时间,使其保持有效。
作用:防止因业务执行时间超过锁的初始过期时间,导致锁提前释放而引发的并发问题,保证锁的持有与业务执行同步。
实现方式:通过org.redisson.RedissonLock中的定时任务实现,获取锁时会判断是否使用看门狗(默认启用),并在锁释放前持续更新过期时间。核心是基于 Redis 的pexpire命令延长锁有效期。
易错点:
这里的 "30 秒" 和 "10 秒" 是两个不同的概念,对应看门狗机制的两个核心参数,作用不同:默认锁过期时间(30 秒):当调用RLock.lock()未指定超时时间时,Redisson 会给锁设置一个初始过期时间(默认 30 秒)。这个时间是锁的 “基础有效期”,防止线程获取锁后崩溃(无法释放锁),导致锁永久占用。看门狗续期间隔(10 秒):为了防止业务执行时间超过 30 秒导致锁过期,Redisson 会启动看门狗线程,每隔 锁过期时间的 1/3(即 30 秒的 1/3=10 秒)检查一次锁的状态。如果线程仍持有锁(未释放),就会将锁的过期时间重新设置为 30 秒(续期)。
为什么这样设计?续期间隔设置为过期时间的 1/3,是为了预留足够的 “容错窗口”:假设锁过期时间 30 秒,续期间隔 10 秒,即使某次续期因网络延迟等原因失败,后续 1-2 次续期仍有机会补救(不会立即导致锁过期)。例如:第 10 秒续期失败,第 20 秒会再次尝试续期,若成功,锁的有效期会延长到 50 秒(20+30),仍能覆盖业务执行时间。
简单说:30 秒是锁的 "有效期",10 秒是 "续命频率",两者配合确保锁不会在业务执行中提前失效。
4. 在优惠券秒杀中怎么用乐观锁、悲观锁解决一人多单超卖问题?
- 悲观锁:秒杀时通过加锁(如 Redis 分布式锁、数据库行锁)确保同一用户只能有一个线程执行下单逻辑。例如,使用 Redisson 的分布式锁,用户 ID 作为锁键,确保同一用户同时只能发起一次下单请求,直接阻止并发下单,避免超卖和一人多单。
- 乐观锁:不直接加锁,而是在更新库存时通过版本号或条件判断实现。例如,在数据库中为商品库存记录添加版本号字段,下单时检查库存是否充足且版本号匹配,更新时同时递增版本号。若版本号不匹配,说明库存已被修改,下单失败。对于一人多单,可通过查询用户已下单数量,在更新时添加 "用户未下单" 的条件判断。
5. 集群环境下,如何基于分布式锁设计方案解决商品超卖问题?
锁粒度设计:以商品 ID 为锁键,确保同一商品的秒杀操作互斥,不同商品可并行处理。
使用 Redisson 分布式锁:利用其可重入、自动续期(看门狗),避免集群环境下的锁失效问题。
加锁流程:
- 用户下单时,先尝试获取该商品的分布式锁。
- 获锁成功后,检查库存是否充足,不足则释放锁并返回失败。
- 库存充足时,扣减库存、创建订单,最后释放锁。
- 获锁失败时,提示用户稍候重试。
兜底措施:结合数据库事务和库存预扣减,确保锁机制失效时仍能通过数据库约束(如唯一索引、库存非负检查)防止超卖。
6. 如何保证分布式锁的原子性(如加锁、解锁过程不被中断)?
加锁原子性:使用 Redis 的SET key value NX PX timeout命令(Redisson 中封装了该命令),该命令是原子操作,确保 "判断键是否存在" 和 "设置键值及过期时间" 一步完成,避免中间被中断导致的锁异常。
解锁原子性:通过 Lua 脚本实现解锁,脚本先判断锁是否属于当前线程(检查 value 是否匹配),再删除锁,确保这两个操作的原子性。例如 Redisson 的解锁脚本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end 避免了 "判断锁归属" 和 "删除锁" 之间被其他操作中断导致的误删问题。
7. 在点评中哪里用到了消息队列,为什么使用消息队列呢?
使用场景:在用户下单流程中,当 Redis 校验下单条件(如库存、限购)通过后,将下单请求作为消息发送到 Redis 消息队列(如基于 List 实现的队列),再由独立线程消费消息完成后续的数据库订单创建、库存扣减等操作。
使用原因:
- 解耦:将下单请求的接收与处理分离,前端无需等待后端完成即可返回,提升响应速度。
- 削峰填谷:秒杀场景下请求量激增,消息队列可缓冲请求,避免直接冲击数据库。
- 异步处理:非核心流程(如订单通知、日志记录)可异步执行,提高主流程效率。
8. 还了解其他的消息队列的实现方式吗?(RocketMQ、RabbitMQ)
- RocketMQ:阿里开源的分布式消息队列,支持高吞吐、低延迟,提供事务消息、定时消息等高级特性,适合大规模分布式系统,如电商订单、支付场景。
- RabbitMQ:基于 AMQP 协议,支持多种消息路由模式(direct、topic、fanout 等),灵活性高,适合复杂的消息分发场景,如日志收集、系统解耦。
- 其他:Kafka(高吞吐,适合日志等大数据场景)
9. 缓存穿透、缓存雪崩、缓存击穿的有效解决方案
缓存穿透:指查询不存在的数据,导致请求直接穿透缓存访问数据库,压垮数据库。解决方案:
- 缓存空值:对不存在的 key 缓存空值(设置较短过期时间),避免重复穿透。
- 布隆过滤器:预先将存在的 key 存入布隆过滤器,请求时先校验,不存在则直接返回。
- 接口限流与校验:对恶意高频请求限流,校验请求参数合法性。
缓存雪崩:指大量缓存 key 同时过期或 Redis 集群故障,导致请求集中冲击数据库。解决方案:
- 过期时间随机化:为不同 key 设置随机过期时间,避免同时失效。
- 缓存集群高可用:部署 Redis 主从 + 哨兵或集群模式,避免单点故障。
- 熔断降级:使用 Hystrix 组件,当缓存失效时限制数据库请求流量,返回默认值或错误提示。
- 预热缓存:提前加载热点数据到缓存,避免冷启动时的缓存缺失。
缓存击穿:核心是 “保护热点 key 的缓存重建过程,避免并发冲击”,常见两种方案:
- 互斥锁(分布式锁):缓存失效时,仅一个线程能获取分布式锁(如 Redisson 锁),该线程负责查询数据库并重建缓存,其他线程等待锁释放后直接查缓存,避免同时访问数据库。
- 逻辑过期:缓存存储时,为 value 添加 “逻辑过期时间”(不设置 Redis 的真实过期时间),查询时判断逻辑过期:若未过期:直接返回缓存数据;若已过期:先返回旧数据,同时异步启动线程重建缓存(重建时加分布式锁,避免重复重建)。
追势科技公司Redis相关面试题
14. Redis 怎么实现分布式锁的
核心结论:Redis 实现分布式锁核心依赖 SETNX 命令(互斥性)+ 过期时间(防死锁),结合 Lua 脚本保证原子性,进阶可用 Redisson 优化功能。
核心实现思路
- 获取锁:用
SET key 线程标识 NX EX 过期时间命令,仅当 key 不存在时创建(保证互斥),同时设置过期时间(避免死锁)。 - 释放锁:先判断锁的线程标识是否为当前线程,一致则删除锁,通过 Lua 脚本将 “判断 + 删除” 合并为原子操作(防误删)。
- 进阶优化:使用 Redisson 框架,支持可重入锁、锁重试、看门狗自动续期、主从一致性等高级特性
另一个公司Redis相关面试题
1. 项目中不使用 Redis 可行吗?实际场景中是否有替代方案?
核心结论:项目中不使用 Redis 可行,但需根据业务场景选择合适替代方案,核心是解决缓存、分布式锁、消息队列等 Redis 承担的功能。
关键替代方案
- 缓存功能替代:使用本地缓存(如 Caffeine、Guava Cache),适用于单机部署、数据量小且无需集群共享的场景;或使用 Memcached,支持分布式缓存,功能简洁但无持久化。
- 分布式锁替代:基于 MySQL 实现(利用
GET_LOCK/RELEASE_LOCK函数),适用于并发量低的场景;或使用 Zookeeper,基于临时顺序节点实现,可靠性高但性能略逊。 - 会话共享替代:使用 Tomcat 集群 session 复制(适用于小规模集群),或基于数据库存储会话信息(性能较差,需优化查询)。
- 消息队列替代:使用 RabbitMQ、Kafka 等专业消息队列,适用于高可靠性、高并发的消息传递场景。
2. 做过 JMeter 压力测试吗?若不添加缓存,系统性能能否满足业务需求?
核心结论:JMeter 是常用的压力测试工具,不添加缓存时,系统性能能否满足需求取决于业务并发量和数据库承载能力,高并发场景下通常无法满足。
JMeter 压力测试核心操作
- 配置线程组:设置并发用户数、循环次数、 Ramp-Up 时间(线程启动间隔);
- 添加采样器:如 HTTP 请求,指定接口地址、请求方式和参数;
- 添加监听器:如聚合报告(查看响应时间、吞吐量、错误率)、响应时间曲线;
- 执行测试:逐步增加并发量,观察系统性能瓶颈。
无缓存时的性能瓶颈
所有请求直接命中数据库,数据库连接池易耗尽,导致响应超时;
磁盘 IO 读写速度远低于内存,高并发下查询延迟显著增加(如单表查询从毫秒级变为秒级);
业务极限并发量大幅降低(如无缓存时支持 100 QPS,添加 Redis 后可支持 10000+ QPS)。
结论
低并发场景(日均访问量10万以下):无缓存可能满足需求,但需优化数据库索引和连接池配置;
中高并发场景(如秒杀、热点数据查询):必须添加缓存,否则系统会因数据库压力过大而崩溃。
6. 采用 "先更新数据库再删缓存" 的策略时,若缓存删除失败导致数据不一致,该如何处理?
结论:可通过重试机制、过期时间兜底等方式解决,核心是确保缓存最终与数据库一致。
解决方案
- 重试删除缓存:缓存删除失败时,通过定时任务或消息队列重试删除,重试次数限制为 3-5 次,避免无限重试;
- 设置缓存过期时间:为缓存数据设置合理的 TTL(如 5-10 分钟),即使删除失败,过期后缓存会自动失效,后续查询会从数据库加载最新数据并重建缓存;
- 异步补偿机制:更新数据库后,将缓存删除操作发送到消息队列(如 Redis Stream),异步线程监听并执行删除,失败时可重试;
- 数据库 binlog 同步:通过监听数据库 binlog(如使用 Canal),当检测到数据更新时,主动删除对应缓存,确保最终一致性。
注意事项
1. 重试时需避免对 Redis 造成过大压力,可添加时间间隔(如 1 秒后重试);
2. 过期时间需根据业务数据更新频率调整,平衡一致性和性能。
7. 金融等场景对数据一致性要求极高,若出现旧数据残留问题,有哪些解决方案?
核心结论:需通过强一致性设计、数据校验、同步更新等方案解决,确保数据无残留、无不一致。
解决方案
- 采用 “先删缓存再更新数据库” + 分布式锁:更新数据前,先删除缓存并获取分布式锁,防止并发查询导致旧数据写入缓存;更新数据库后,释放锁,确保后续查询加载最新数据(需处理锁超时和失败回滚)。
- 缓存与数据库同步更新(双写):更新数据库后,同步更新缓存,使用事务确保两者操作原子性(单机场景);分布式场景下,使用 TCC 或 SAGA 分布式事务方案。
- 数据校验机制:读取数据时,同时对比缓存和数据库的版本号或更新时间戳,若不一致则以数据库为准,并更新缓存;
- 定时对账任务:定期扫描缓存和数据库数据,发现不一致时自动修复(如删除旧缓存、更新为数据库最新数据)。
关键原则
- 金融场景优先保证数据一致性,可牺牲部分性能;
- 避免使用异步删除 / 更新缓存,必要时采用同步机制 + 重试兜底。
8. 项目中如何使用日志?如何快速定位 “缓存未删除成功” 这类特定场景的日志?
核心结论:项目中使用日志框架(如 SLF4J + Logback)记录不同级别日志,通过结构化日志 + 关键词过滤快速定位特定场景问题。
日志使用核心规范
- 日志级别划分:ERROR(记录异常错误,如缓存删除失败、数据库连接异常);WARN(记录警告,如缓存过期、数据格式不规范);INFO(记录核心流程,如缓存删除操作、接口调用成功);DEBUG(记录调试信息,如缓存键值、数据库查询参数)。
- 结构化日志输出:使用 JSON 格式记录日志,包含关键字段(如
scene: "cache_delete"、cacheKey: "shop:1001"、success: false、errorMsg: "Redis connection timeout")。 - 日志存储与检索:将日志输出到文件并按日期滚动,高并发场景下使用 ELK 栈(Elasticsearch + Logstash + Kibana)或 Loki 存储和检索日志。
快速定位 “缓存未删除成功” 的日志
- 日志中包含固定关键词:如
cache_delete_failed、cacheKey、errorMsg; - 在日志检索工具中执行过滤条件:
scene:cache_delete AND success:false,快速筛选所有缓存删除失败的记录; - 关联上下文日志:通过请求 ID(Trace ID)关联同一请求的日志,查看缓存删除前的更新操作、Redis 连接状态等,定位失败原因(如 Redis 宕机、权限不足、键不存在)。
9. 项目中如何通过乐观锁实现超卖防护的?具体实现逻辑和注意事项是什么?
核心结论:通过数据库乐观锁(基于版本号或 CAS 思想)实现超卖防护,核心是确保库存扣减的原子性和一致性。
具体实现逻辑
1.数据库表设计:在库存表(如 tb_seckill_voucher)中添加版本号字段(version),或直接基于库存字段做 CAS 校验;
2.扣减库存 SQL:采用 CAS 思想,更新时校验库存是否满足条件,确保原子性:
UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE voucher_id = ? AND stock > 0; -- 库存大于 0 才扣减,避免超卖 或基于版本号:
UPDATE tb_seckill_voucher SET stock = stock - 1, version = version + 1 WHERE voucher_id = ? AND version = ? AND stock > 0; 业务逻辑:
- 查询优惠券库存和版本号(或直接依赖 SQL 校验);
- 执行上述更新 SQL,判断影响行数;
- 若影响行数 > 0,说明扣减成功,继续创建订单;若为 0,说明库存不足或已被其他线程扣减,返回超卖提示。
注意事项
- 避免先查询库存再扣减(非原子操作),必须通过 SQL 条件确保扣减的原子性;
- 基于版本号的乐观锁,需处理版本号不一致导致的更新失败,可重试 1-2 次(避免频繁重试造成自旋压力);
- 高并发场景下,可结合 Redis 预扣减库存(先扣 Redis 库存,再异步同步到数据库),进一步提升性能。
12 黑马课程配套Redis面经
什么是缓存穿透? 怎么解决?

候选人:
缓存穿透是指查询一个一定不存在的数据,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到 DB 去查询,可能导致 DB 挂掉。这种情况大概率是遭到了攻击。
解决方案的话,我们通常都会用布隆过滤器来解决它
介绍一下布隆过滤器



候选人:
布隆过滤器主要是用于检索一个元素是否在一个集合中。我们当时使用的是redisson实现的布隆过滤器。
它的底层主要是先去初始化一个比较大数组,里面存放的二进制0或1。在一开始都是0,当一个key来了之后经过3次hash计算,模于数组长度找到数据的下标然后把数组中原来的0改为1,这样的话,三个数组的位置就能标明一个key的存在。查找的过程也是一样的。
当然是有缺点的,布隆过滤器有可能会产生一定的误判,我们一般可以设置这个误判率,大概不会超过5%,其实这个误判是必然存在的,要不就得增加数组的长度,其实已经算是很划分了,5%以内的误判率一般的项目也能接受,不至于高并发下压倒数据库。
什么是缓存击穿? 怎么解决?


候选人:
缓存击穿的意思是对于设置了过期时间的key,缓存在某个时间点过期的时候,恰好这时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把 DB 压垮。
解决方案有两种方式:
第一可以使用互斥锁:当缓存失效时,不立即去load db,先使用如 Redis 的 setnx 去设置一个互斥锁,当操作成功返回时再进行 load db的操作并回设缓存,否则重试get缓存的方法
第二种方案可以设置当前key逻辑过期,大概是思路如下:
①:在设置key的时候,设置一个过期时间字段一块存入缓存中,不给当前key设置过期时间
②:当查询的时候,从redis取出数据后判断时间是否过期
③:如果过期则开通另外一个线程进行数据同步,当前线程正常返回数据,这个数据不是最新
当然两种方案各有利弊:
如果选择数据的强一致性,建议使用分布式锁的方案,性能上可能没那么高,锁需要等,也有可能产生死锁的问题
如果选择key的逻辑删除,则优先考虑的高可用性,性能比较高,但是数据同步这块做不到强一致。
什么是缓存雪崩? 怎么解决?

候选人:
缓存雪崩意思是设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB 瞬时压力过重雪崩。与缓存击穿的区别:雪崩是很多key,击穿是某一个key缓存。
解决方案主要是可以将缓存失效时间分散开,比如可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
总结:

redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)


候选人:最近做的这个项目,里面有xxxx(根据自己的简历上写)的功能,需要让数据库与redis高度保持一致,因为要求时效性比较高,我们当时采用的读写锁保证的强一致性。
我们采用的是redisson实现的读写锁,在读的时候添加共享锁,可以保证读读不互斥,读写互斥。当我们更新数据的时候,添加排他锁,它是读写,读读都互斥,这样就能保证在写数据的同时是不会让其他线程读数据的,避免了脏数据。这里面需要注意的是读方法和写方法上需要使用同一把锁才行。
那这个排他锁是如何保证读写、读读互斥的呢?



候选人:其实排他锁底层使用也是setnx,保证了同时只能有一个线程操作锁住的方法
面试官:你听说过延时双删吗?为什么不用它呢?
候选人:延迟双删,如果是写操作,我们先把缓存中的数据删除,然后更新数据库,最后再延时删除缓存中的数据,其中这个延时多久不太好确定,在延时的过程中可能会出现脏数据,并不能保证强一致性,所以没有采用它。
redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)

候选人:就说我最近做的这个项目,里面有xxxx(根据自己的简历上写)的功能,数据同步可以有一定的延时(符合大部分业务)
我们当时采用的阿里的canal组件实现数据同步:不需要更改业务代码,部署一个canal服务。canal服务把自己伪装成mysql的一个从节点,当mysql数据更新以后,canal会读取binlog数据,然后在通过canal的客户端获取到数据,更新缓存即可。
redis如何实现数据的持久化?两者区别?






候选人:在Redis中提供了两种数据持久化的方式:RDB 和 AOF
RDB是一个快照文件,它是把redis内存存储的数据写到磁盘上,当redis实例宕机恢复数据的时候,方便从RDB的快照文件中恢复数据。
AOF的含义是追加文件,当redis操作写命令的时候,都会存储这个文件中,当redis实例宕机恢复数据的时候,会从这个文件中再次执行一遍命令来恢复数据
这两种方式,哪种恢复的比较快呢?

候选人:RDB因为是二进制文件,在保存的时候体积也是比较小的,它恢复的比较快,但是它有可能会丢数据,我们通常在项目中也会使用AOF来恢复数据,虽然AOF恢复的速度慢一些,但是它丢数据的风险要小很多,在AOF文件中可以设置刷盘策略,我们当时设置的就是每秒批量写入一次命令
Redis的数据过期策略有哪些?


候选人:
在redis中提供了两种数据过期删除策略
第一种是惰性删除,在设置该key过期时间后,我们不去管它,当需要该key时,我们在检查其是否过期,如果过期,我们就删掉它,反之返回该key。
第二种是 定期删除,就是说每隔一段时间,我们就对一些key进行检查,删除里面过期的key
定期清理的两种模式:SLOW模式是定时任务,执行频率默认为10hz,每次不超过25ms,以通过修改配置文件redis.conf 的 hz 选项来调整这个次数FAST模式执行频率不固定,每次事件循环会尝试执行,但两次间隔不低于2ms,每次耗时不超过1ms
Redis的过期删除策略:惰性删除 + 定期删除两种策略进行配合使用。
Redis的数据淘汰策略有哪些?


候选人:
嗯,这个在redis中提供了很多种,默认是noeviction,不删除任何数据,内部不足直接报错
是可以在redis的配置文件中进行设置的,里面有两个非常重要的概念,一个是LRU,另外一个是LFU
LRU的意思就是最少最近使用。用当前时间减去最后一次访问时间,这个值越大则淘汰优先级越高。
LFU的意思是最少频率使用。会统计每个key的访问频率,值越小淘汰优先级越高
我们在项目设置的allkeys-lru,挑选最近最少使用的数据淘汰,把一些经常访问的key留在redis中
数据库有1000万数据 ,Redis只能缓存20w数据, 如何保证Redis中的数据都是热点数据?
候选人:
可以使用 allkeys-lru (挑选最近最少使用的数据淘汰)淘汰策略,那留下来的都是经常访问的热点数据
Redis的内存用完了会发生什么?
候选人:
这个要看redis的数据淘汰策略是什么,如果是默认的配置,redis内存用完以后则直接报错。我们当时设置的 allkeys-lru 策略。把最近最常访问的数据留在缓存中。

Redis分布式锁如何实现?


候选人:在redis中提供了一个命令setnx(SET if not exists)
由于redis的单线程的,用了命令之后,只能有一个客户端对某一个key设置值,在没有过期或删除key的时候是其他客户端是不能设置这个key的
如何控制Redis实现分布式锁有效时长呢?


候选人:redis的setnx指令不好控制这个问题,我们当时采用的redisson实现的。
在redisson中需要手动加锁,并且可以控制锁的失效时间和等待时间,当锁住的一个业务还没有执行完成的时候,在redisson中引入了一个看门狗机制,就是说每隔一段时间就检查当前业务是否还持有锁,如果持有就增加加锁的持有时间,当业务执行完成之后需要使用释放锁就可以了
还有一个好处就是,在高并发下,一个业务有可能会执行很快,先客户1持有锁的时候,客户2来了以后并不会马上拒绝,它会自旋不断尝试获取锁,如果客户1释放之后,客户2就可以马上持有锁,性能也得到了提升。
redisson实现的分布式锁是可重入的吗?
候选人:是可以重入的。这样做是为了避免死锁的产生。这个重入其实在内部就是判断是否是当前线程持有的锁,如果是当前线程持有的锁就会计数,如果释放锁就会在计算上减一。在存储数据的时候采用的hash结构,大key可以按照自己的业务进行定制,其中小key是当前线程的唯一标识,value是当前线程重入的次数
redisson实现的分布式锁能解决主从一致性的问题吗

候选人:这个是不能的,比如,当线程1加锁成功后,master节点数据会异步复制到slave节点,此时当前持有Redis锁的master节点宕机,slave节点被提升为新的master节点,假如现在来了一个线程2,再次加锁,会在新的master节点上加锁成功,这个时候就会出现两个节点同时持有一把锁的问题。
我们可以利用redisson提供的红锁来解决这个问题,它的主要作用是,不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,并且要求在大多数redis节点上都成功创建锁,红锁中要求是redis的节点数量要过半。这样就能避免线程1加锁成功后master节点宕机导致线程2成功加锁到新的master节点上的问题了。
但是,如果使用了红锁,因为需要同时在多个节点上都添加锁,性能就变的很低了,并且运维维护成本也非常高,所以,我们一般在项目中也不会直接使用红锁,并且官方也暂时废弃了这个红锁
如果业务非要保证数据的强一致性,这个该怎么解决呢?
候选人:redis本身就是支持高可用的,做到强一致性,就非常影响性能,所以,如果有强一致性要求高的业务,建议使用zookeeper实现的分布式锁,它是可以保证强一致性的。
Redis集群有哪些方案?
候选人:在Redis中提供的集群方案总共有三种:主从复制、哨兵模式、Redis分片集群
那你来介绍一下主从同步
候选人:单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,可以搭建主从集群,实现读写分离。一般都是一主多从,主节点负责写数据,从节点负责读数据,主节点写入数据之后,需要把数据同步到从节点中
主从同步数据的流程


候选人:主从同步分为了两个阶段,一个是全量同步,一个是增量同步
全量同步是指从节点第一次与主节点建立连接的时候使用全量同步,流程是这样的:
第一:从节点请求主节点同步数据,其中从节点会携带自己的replication id和offset偏移量。
第二:主节点判断是否是第一次请求,主要判断的依据就是,主节点与从节点是否是同一个replication id,如果不是,就说明是第一次同步,那主节点就会把自己的replication id和offset发送给从节点,让从节点与主节点的信息保持一致。
第三:在同时主节点会执行bgsave,生成rdb文件后,发送给从节点去执行,从节点先把自己的数据清空,然后执行主节点发送过来的rdb文件,这样就保持了一致
当然,如果在rdb生成执行期间,依然有请求到了主节点,而主节点会以命令的方式记录到缓冲区,缓冲区是一个日志文件,最后把这个日志文件发送给从节点,这样就能保证主节点与从节点完全一致了,后期再同步数据的时候,都是依赖于这个日志文件,这个就是全量同步
增量同步指的是,当从节点服务重启之后,数据就不一致了,所以这个时候,从节点会请求主节点同步数据,主节点还是判断不是第一次请求,不是第一次就获取从节点的offset值,然后主节点从命令日志中获取offset值之后的数据,发送给从节点进行数据同步

怎么保证Redis的高并发高可用


候选人:首先可以搭建主从集群,再加上使用redis中的哨兵模式,哨兵模式可以实现主从集群的自动故障恢复,里面就包含了对主从服务的监控、自动故障恢复、通知;如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主;同时Sentinel也充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端,所以一般项目都会采用哨兵的模式来保证redis的高并发高可用
你们使用redis是单点还是集群,采用什么集群方式
候选人:我们当时使用的是主从(1主1从)加哨兵。一般单节点不超过10G内存,如果Redis内存不足则可以给不同服务分配独立的Redis主从节点。尽量不做分片集群。因为集群维护起来比较麻烦,并且集群之间的心跳检测和数据通信会消耗大量的网络带宽,也没有办法使用lua脚本和事务
redis集群脑裂,该怎么解决呢?

候选人:我们现在用的是redis的哨兵模式集群的,有的时候由于网络等原因可能会出现脑裂的情况,就是说,由于redis master节点和redis salve节点和sentinel处于不同的网络分区,使得sentinel没有能够心跳感知到master,所以通过选举的方式提升了一个salve为master,这样就存在了两个master,就像大脑分裂了一样,这样会导致客户端还在old master那里写入数据,新节点无法同步数据,当网络恢复后,sentinel会将old master降为salve,这时再从新master同步数据,这会导致old master中的大量数据丢失。
关于解决的话,我记得在redis的配置中可以设置:第一可以设置最少的salve节点个数,比如设置至少要有一个从节点才能同步数据,第二个可以设置主从数据复制和同步的延迟时间,达不到要求就拒绝请求,就可以避免大量的数据丢失
redis的分片集群有什么作用
候选人:分片集群主要解决的是,海量数据存储的问题,集群中有多个master,每个master保存不同数据,并且还可以给每个master设置多个slave节点,就可以继续增大集群的高并发能力。同时每个master之间通过ping监测彼此健康状态,就类似于哨兵模式了。当客户端请求可以访问集群任意节点,最终都会被转发到正确节点
Redis分片集群中数据是怎么存储和读取的?
候选人:
嗯~,在redis集群中是这样的
Redis 集群引入了哈希槽的概念,有 16384 个哈希槽,集群中每个主节点绑定了一定范围的哈希槽范围, key通过 CRC16 校验后对 16384 取模来决定放置哪个槽,通过槽找到对应的节点进行存储。
取值的逻辑是一样的
Redis是单线程的,但是为什么还那么快?


候选人:完全基于内存的,C语言编写采用单线程,避免不必要的上下文切换可竞争条件使用多路I/O复用模型,非阻塞IO
如:bgsave 和 bgrewriteaof 都是在后台执行操作,不影响主线程的正常使用,不会产生阻塞
解释一下I/O多路复用模型?




候选人:I/O多路复用是指利用单个线程来同时监听多个Socket ,并在某个Socket可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。目前的I/O多路复用都是采用的epoll模式实现,它会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间,不需要挨个遍历Socket来判断是否就绪,提升了性能。
其中Redis的网络模型就是使用I/O多路复用结合事件的处理器来应对多个Socket请求,比如,提供了连接应答处理器、命令回复处理器,命令请求处理器;
在Redis6.0之后,为了提升更好的性能,在命令回复处理器使用了多线程来处理回复事件,在命令请求处理器中,将命令的转换使用了多线程,增加命令转换速度,在命令执行的时候,依然是单线程
感谢大家的阅读!🌹
黑马Redis全套资料百度网盘链接
https://pan.baidu.com/s/1LyW0zKQ_4o4FnTQBCA15Sg
提取码: wind