若文章内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系博主删除。


  • 资料链接https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA提取码eh11
  • 在线视频https://www.bilibili.com/video/BV1cr4y1671t视频合集共 175P总时长42:45:37

  • 项目源码地址https://gitee.com/huyi612/hm-dianping
  • 这个是视频作者的代码地址

  • 写该系列博客的目的旨在学习巩固知识,方便个人在线阅览。(记录学习时刻)
  • 博客的内容主要来自黑马程序员官方免费公开提供的资料里的文档笔记中的 PPT

我这篇博客是没有多少代码记录的主要是理清思路和知识点

  • 对于视频中需要注意的地方会提一下。(比如代码错误,在测试高并发业务前需要进行的前置操作等)
  • 但是代码中也有很多知识点,这点只能结合这视频看了。

这里推荐两篇博客对于视频中内容记录的十分详细有具体代码和具体分析

  • Redis 笔记_基础篇_实战篇_黑马点评项目https://blog.csdn.net/weixin_45033015/article/details/127545710
  • Redis 实战https://blog.csdn.net/weixin_43424325/article/details/127223497

0.系列文章目录


  • 《Redis 学习笔记①基础篇_Redis 快速入门》
  • 《Redis 学习笔记②实战篇_黑马点评项目》



1.搭建黑马点评项目


  1. 导入课前资料提供的 SQL 文件hmdp.sql
  • 注意事项MySQL 的版本采用 5.7 及以上版本

创建一个数据库 hmdp

CREATE DATABASE IF NOT EXISTS hmdp DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;

该库涉及到的表有:

  • tb_user:用户表
  • tb_user_info:用户详情表
  • tb_shop:商户信息表
  • tb_shop_type:商户类型表
  • tb_blog:用户日记表(达人探店日记)
  • tb_follow:用户关注表
  • tb_voucher:优惠券表
  • tb_voucher_order:优惠券的订单表

  1. 项目架构

这里不采用微服务架构的模式,因为这里我们关注的是 Redis,所以这里采用的项目是单体项目。

不过这里我们这里的项目是前后端分离的,后端部署在 Tomcat 上,前端部署在 Nginx 服务器上。

尽管黑马点评项目是一个单体项目,但是将来我们还是会考虑到该项目的并发能力的,所以必须要保证项目的水平拓展能力(集群)。


  1. 导入后端项目

在资料中提供了一个项目源码:hm-dianping

将其该文件包复制到你的 idea 工作空间,然后再用 idea 打开即可

启动项目后,在浏览器访问:http://localhost:8081/shop-type/list,如果可以看到数据则证明运行没有问题

  • 注意事项:不要忘记修改 application.yaml 文件中的 MySQL、Redis 的地址信息

  1. 导入前端项目

在资料中提供了一个 nginx 文件夹将其复制到任意目录,要确保该目录不包含中文、特殊字符和空格

运行前端项目:在 nginx 所在目录下打开一个 CMD 窗口,输入命令:start nginx.exe

然后访问 http://127.0.0.1:8080 ,即可看到页面:


2.短信登录


短信验证码的登录注册功能】【Redis 解决 Session 共享问题

  • 搭建项目 的内容被我放在了上一章节

2.1.基于 Session 的短信登录


基于 session 实现登录的流程图


  1. 发送短信验证码


  1. 短信验证码登录和注册功能


  1. 登录验证功能

上图中的请求是携带了 cookie 的,cookie 里是包含了 JSESSIONID 的。

服务端可以基于 JSESSIONID 来获得 session ,再从 session 里取出用户,进而来判断该用户是否存在。

但是这个流程里有一个问题,我们需要在每一个 controller 里来写这些业务逻辑。

我们可以加一个拦截器(由 SpringMVC 提供)来统一判断信息,决定是否放行。

此外,session 以后要做分布式 session,考虑到系统负担和安全,我们可以在拦截器拦截到之后,将 session 中的用户信息保存到 ThreadLocal 中。每一个进入 Tomcat 的请求都是一个独立的线程,那么将来 ThreadLocal 就会在线程内开启一个独立的空间去保存这些请求(请求中携带了对应的用户信息)。这样一来,不同的用户访问 controller,都是一个独立的线程,每一个线程都有自己的用户信息,相互独立不干扰,controller 从 ThreadLocal 中取出用户信息。


官方给的网盘文件里的前端资料有点小问题

我们需要将 nginx-1.18.0\html\hmdp\index.html 文件内的 **location.href = "/index.html"**改为 location.href = "/info.html"(位于 methods 内的 login 方法的 axios.post().then() 中,第 87 行)。此外我们还需要将 nginx-1.18.0\html\hmdp 文件中的 location.href = "login.html" 改为 location.href = "info.html"(位于 methods 中的 query 方法内的 axios.get().then.catch() 中,第 164 行)。


2.2.集群下的 session 共享问题


session 共享问题:多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 tomcat 服务时导致数据丢失的问题。

session 的替代方案 应该满足:数据共享;内存存储;key、value 结构(Redis 恰好就满足这些情况)


2.3.Redis 实现共享 session


Redis 代替 session 需要考虑的问题:

  • 选择合适的数据结构
  • 选择合适的 key
  • 选择合适的存储粒度

保存登录的用户信息,可以使用 String 结构,以 JSON 字符串来保存,比较直观

KEYVALUE
heima:user:1{name:“Jack”, age:21}
heima:user:2{name:“Rose”, age:18}

Hash 结构可以将对象中的每个字段独立存储,可以针对单个字段做 CRUD,并且内存占用更少

KEYVALUE
fieldvalue
heima:user:1nameJack
age21
heima:user:2nameRose
age18



2.4.Redis 解决 session 的刷新的问题


对于一些不会被拦截器拦截的路径(比如用户一直访问不需要登录的首页),拦截器就不会生效,它就不会刷新。

那么过了有效时间后,尽管用户一直在访问,但用户的登录状态也就消失了。


我们可以在原有的拦截器上新增一个拦截器,拦截一切路径。


3.商户查询缓存


商家查询的缓存功能】【Redis 的缓存实战方案


3.1.认识缓存


缓存 就是数据交换的缓冲区(称作 Cache [ kæʃ ]),是存贮数据的临时地方,一般读写性能较高。

  • 缓存的作用:降低后端负载;提高读写效率,降低响应时间
  • 缓存的成本:数据一致性成本;代码维护成本;运维成本


3.2.添加 Redis 缓存


3.2.1.根据 ID 查询商铺


  • 根据 id 查询商铺的业务添加缓存


3.2.2.给店铺类型查询业务添加缓存


练习给店铺类型查询业务添加缓存


店铺类型在首页和其它多个页面都会用到且不会经常发生改动,这种类型的数据适合存储在缓存中。


需求修改 ShopTypeController 中的 queryTypeList 方法添加查询缓存


  • 相关 URL:http://localhost:8080/api/shop-type/listGET

  • src/main/java/com/hmdp/controller/ShopController.java


  • 具体代码实现见https://blog.csdn.net/yanzhaohanwei/article/details/127810364

3.3.缓存更新策略


3.3.1.三种更新策略


内存淘汰超时剔除主动更新
说明不用自己维护。
利用 Redis 的内存淘汰机制:
当内存不足时自动淘汰部分数据。
下次查询时更新缓存。
给缓存数据添加 TTL 时间,到期后自动删除缓存。
下次查询时更新缓存。
编写业务逻辑,在修改数据库的同时,更新缓存。
一致性一般
维护成本

业务场景

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存。

3.3.2.主动更新策略


主动更新策略

  1. Cache Aside Pattern
    • 由缓存的调用者,在更新数据库的提示更新缓存。
    • 特点:一致性良好、实现难度一般。
  2. Read/Write Through Pattern
    • 缓存与数据库整合为一个服务,由服务来维护一致性。
    • 调用者调用该服务,无需关心缓存一致性问题。
    • 特点:一致性优秀、实现复杂、性能一般。
  3. Write Behind Caching Pattern
    • 调用者只操作缓存,由其他线程异步的将缓存数据持久化到数据库,保存最终一致。
    • 特点:一致性差、性能好、实现复杂。

第 2 、3 种的方案维护起来比较复杂,也很难找到合适的第三方组件,且第 3 种方案很难保证一致性和可靠性。

综上所述,可控性较高的是第 1 种方案,企业也大多采用这种方案。


操作缓存和数据库时有三个问题需要考虑:

  1. 删除缓存还是更新缓存?

    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多,存在较大的线程安全问题。

    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存,本质是延迟更新。

  2. 如何保证缓存与数据库的操作的同时成功或失败?

    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用 TCC 等分布式事务方案
  3. 先操作缓存还是先操作数据库?

    • 先删除缓存,再更新数据库(在满足原子性的情况下,安全问题概率较低)
    • 先更新数据库,再删除缓存(安全问题概率较高)

先操作缓存?还是先操作数据库?

假设数据库和缓存里的数据是 v = 10

  • 第一种方案先删除缓存,再输出数据库

    异常情况介绍:在线程 1 删除缓存后,完成对数据库的更新(目标是更新为 v = 20)前。线程 2 恰好此时也查询了缓存,但是这时的缓存已经被线程 1 删除了,所以线程 1 它又直接去查询了数据库,并将数据库中的数据(v = 10)写入了缓存。在线程 2 进行完了上述的操作后,线程 1 才终于完成了对数据库中的数据的更新(v = 20)。此时,缓存中的数据为 v = 10,数据库中的数据为 v = 20,此时数据库和缓存中的数据不一致。

  • 第二种方案先操作数据库,再删除缓存

    异常情况介绍:由于某种原因(不如过期时间到了),缓存此时恰好失效了,线程 1 查询不到缓存,线程 1 它需要再去数据库中查询数据后再写入缓存。但是就在线程 1 完成写入缓存的操作前,恰好此时线程 2 来更新数据库的数据(更新 v = 20),之后线程 2 又删除了缓存(此时缓存是空的,所以这里相当于删除了个寂寞)。在线程 2 完成这些操作后,线程 1 才终于将数据库中的旧数据写入了缓存(v = 10)。此时数据库中的数据(v = 20)和缓存中的数据(v = 10)不一致。

虽然上述两种方案都有安全问题,但是第二种方案的出现问题的概率是相对来说更低一些,因为缓存中更新比磁盘中的更新要快。

此外,还可以给缓存中的数据加超时时间,以应对异常情况的发生。


3.3.3.小结


缓存更新策略的最佳实践方案:

  1. 低一致性需求:使用 Redis 自带的内存淘汰机制
  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案
  • 读操作:
    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:
    • 先写数据库,然后再删除缓存
      • 要确保数据库与缓存操作的原子性

3.3.4.主动更新的案例


案例给查询商铺的缓存添加超时剔除和主动更新的策略

  • 修改 ShopController 中的业务逻辑,满足下面的需求
    • ① 根据 id 查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
    • ② 根据 id 修改店铺时,先修改数据库,再删除缓存


3.4.缓存穿透



3.4.1.概念


缓存穿透 是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

  • 常见的解决方案有两种:
    • 缓存空对象
      • 优点:实现简单,维护方便
      • 缺点:
        • 额外的内存消耗
        • 可能造成短期的不一致
    • 布隆过滤
      • 优点:内存占用较少,没有多余 key
        • 缺点:
          • 实现复杂
          • 存在误判可能

3.4.2.缓存空对象


缓存空对象的业务流程(解决商铺查询的缓存穿透问题)


3.4.3.小结


缓存穿透产生的原因是什么?

  • 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

  • 缓存 null 值
  • 布隆过滤
  • 增强 id 的复杂度,避免被猜测 id 规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

3.5.缓存雪崩



缓存雪崩 是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

  • 给不同的 Key 的 TTL 添加随机值
  • 利用 Redis 集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存


3.6.缓存击穿



3.6.1.概念和解决方案


缓存击穿问题 也叫热点 Key 问题

  • 就是一个被 高并发访问 并且 缓存重建业务较复杂 的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。


常见的解决方案有两种:1. 逻辑锁;2. 逻辑过期


解决方案优点缺点
互斥锁没有额外的内存消耗
保证一致性
实现简单
线程需要等待,性能受影响
可能有死锁风险
逻辑过期线程无需等待,性能较好不保证一致性
有额外内存消耗
实现复杂

3.6.2.解决缓存击穿的案例


基于 互斥锁 方式解决缓存击穿问题

  • 需求:修改根据 id 查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
  • 自定义互斥锁(Redis 中的 setnx 就可以办到这点)


基于逻辑过期方式解决缓存击穿问题

  • 需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题


3.7.缓存工具封装


基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:

  • 方法 1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
  • 方法 2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法 3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法 4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

4.秒杀场景概览


Redis 在秒杀场景下的应用



全局唯一 ID实现优惠券秒杀下单超卖问题一人一单分布式锁Redis 优化秒杀Redis 消息队列实现异步秒杀


5.秒杀优惠劵功能


5.1.全局 ID 生成器


每个店铺都可以发布优惠券

当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中

而订单表如果使用数据库自增 ID 就存在一些问题:

  • id 的规律性太明显
  • 受单表数据量的限制

全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性


为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息

ID 的组成部分:

  • 符号位:1 bit,永远为 0
  • 时间戳:31 bit,以秒为单位,可以使用 69 年
  • 序列号:32 bit,秒内的计数器,支持每秒产生 2 322^{32} 232 个不同 ID

全局唯一 ID 生成策略

  • UUID
  • Redis 自增
  • snowflake 算法
  • 数据库自增

Redis 自增 ID 策略

  • 每天一个 key,方便统计订单量
  • ID 构造是 时间戳 + 计数器

5.2.秒杀优惠券功能


5.2.1.秒杀优惠券的基本实现


每个店铺都可以发布优惠券,分为 平价券 和 特价券。平价券 可以任意购买,而 特价券 需要秒杀抢购。

表关系如下:

  • tb_voucher:优惠券的基本信息,优惠金额、使用规则等
  • tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息

秒杀优惠券表,与优惠券是一对一关系


在 VoucherController 中提供了一个接口,可以添加秒杀优惠券

  • 这个是要发送存储的数据,需要注意的地方是 beginTime 和 endTime 的前后顺序,以及不能早于你当前的时间。
{"shopId": 1,"title": "100元代金券","subTitle": "周一至周五均可使用","rules": "全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食","payValue": 8000,"actualValue": 10000,"type": 1,"stock": 100,"beginTime": "2022-11-14T17:17:17","endTime": "2022-12-31T12:12:12"}

用户可以在店铺页面中抢购这些优惠券


实现优惠券秒杀的下单功能的业务流程

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单


5.2.2.超卖问题


此处会用到 JMeter 测试,需要注意的地方是要修改两个地方,保证 Jmeter 中的值与数据库和 Redis 中的数值一致。




正常情况下,线程 1 与线程 2 互不干扰。例如库存为 1,线程 1 查询库存后扣减;线程 2 查询库存发现为 0,故报错(不扣减)


异常情况下,线程 1 和线程 2 都查询了库存为 1(二者都未进行判断并扣减),之后二者都扣减了。

如果库存为 1,则最终结果是 -1。这也就是超卖问题的由来。(其实就是线程并发安全问题)


5.2.3.加锁


加锁的两种方式

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

  • 悲观锁

    • 认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
    • 例如 SynchronizedLock 都属于悲观锁
  • 乐观锁

    • 认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。

      • 如果没有修改则认为是安全的,自己才更新数据。

      • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常


乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的处理方式有两种:版本号CAS


  • 版本号方式

给数据添加一个 version,当该数据被修改时,version 数值就会被加一。

比如下图的情况:线程一修改过数据,version 已经变成了 2;线程二再去查找 version,发现已经不为 1 了,不会再修改数据了。


  • CAS 方式Compare And Swap

这里用库存值代替了上面的 version。


5.2.4.小结


超卖这样的线程安全问题,解决方案有哪些?

  • 悲观锁:添加同步锁,让线程串行执行

    • 优点:简单粗暴
    • 缺点:性能一般
  • 乐观锁:不加锁,在更新时判断是否有其它线程在修改

    • 优点:性能好
    • 缺点:存在成功率低的问题

6.秒杀的一人一单的限制功能


6.1.单机情况下实现一人一单


需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

通过加锁可以解决在单机情况下的一人一单安全问题。


6.2.集群下的并发安全问题


通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

  1. 我们将服务启动两份,端口分别为 8081 和 8082:

  1. 然后修改 nginx 的 conf 目录下的 nginx.conf 文件,配置反向代理和负载均衡

现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。



集群部署的时候(例如有两套),两套 JVM 都有自己的锁监视器,锁监视器只能检测到当前自己 JVM 内部的锁。

当多套 JVM 同时运行时,就会再次出现线程安全问题。


6.3.分布式锁



6.3.1.分布式锁的原理


分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。


分布式锁的特点多进程可见互斥高可用高性能(高并发)、安全性 … …


分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种

MySQLRedisZookeeper
互斥利用 MySQL 本身的互斥锁机制利用 setnx 这样的互斥命令利用节点的唯一性和有序性实现互斥
高可用
高性能一般一般
安全性断开连接,自动释放锁利用锁超时时间,到期释放临时节点,断开连接自动释放

6.3.2.基于 Redis 的分布式锁


实现分布式锁时需要实现的两个基本方法:

  • 获取锁

    • 互斥:确保只能有一个线程获取锁

      # 添加锁,利用 setnx 的互斥特性SETNX key value
      # 添加锁的过期时间,避免服务宕机引起的死机EXPIRE key 时间数字,单位是秒

      如果 Redis 宕机发生在添加锁完成之后,尚未添加锁的过期时间之间,此时如何保证其的原子性?

      将上述俩指令合并为一条命令就可以了

      所以,最终的操作是 set lock thread01 ex 10 nx

      SET key value EX 超时时时间的数字 NX
  • 释放锁

    • 手动释放

    • 超时释放(获取锁时添加一个超时时间)

      # 释放锁,删除即可Del key
  • 非阻塞式获取锁

    在获取锁失败后有两种机制,一种是阻塞时获取锁,另一种是非阻塞式获取锁

    这里我们使用非阻塞式获取锁


需求:定义一个类,实现下面接口,利用 Redis 实现分布式锁功能。

package com.hmdp.utils;public interface ILock {/** * 尝试获取锁 * * @param timeoutSec 所持有的超时时间,过期后自动释放 * @return [true]代表获取锁成功;[false]代表获取锁失败 */boolean tryLock(long timeoutSec);/** * 释放锁 */void unLock();}

6.3.3.锁的误删问题


存在一种极端情况,线程一获取锁之后因为其他原因导致业务阻塞,以至于超时释放了锁。

之后线程二拿到了锁,恰好在此时,线程一阻塞结束且业务完成了,线程一就直接释放锁(我们这里的释放锁的处理就是 Del key)。

此时,线程二的锁被释放了(因为删除的就是它的锁)。

恰好此时线程三也去获取锁,因为锁已经被删除了,它也可以执行业务了。

这样就造成两个线程在同时执行业务了。这样也就没办法保证一人一票的业务了。

类似于你解自行车锁,解半天解不开气的直接砸了锁,砸完后锁开了,发现不是自己的自行车的这种情况。


解决办法就是在线程尝试获取锁的同时存入线程标识,在释放锁之前判断是否是自己的线程标识


  • 案例:改进 Redis 的分布式锁
    • 需求:修改之前的分布式锁实现,满足:
      • 在获取锁时存入线程标示(可以用 UUID 表示)
      • 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
        • 如果一致则释放锁
        • 如果不一致则不释放锁

6.3.4.锁的原子操作问题


线程 1 执行业务完成后,成功判断了 “当前 Redis 中的线程标识 和 获取锁时存入 Redis 的线程标识”,发现是两标识是相同的,去执行释放锁的操作的时候发生了阻塞(比如 JVM 中的垃圾回收)。

又因为阻塞时长太长的缘故,锁自行释放了(超时释放锁)。

此时线程 2 去拿到了锁,并执行业务。

在执行业务的过程中,线程 1 阻塞结束,因为之前已经进行过判断了,它已经确认锁是自己的锁了,故去释放锁,但是这个锁实际上是线程 2 的。(此处 key 值是唯一的,我们之前加的标识在 value 中,但线程 1 已经判断过不会再确认 value 中的线程标识了,所以线程 1 可以成功删除 key,即释放锁)

结果就是线程 3 也可以获取到锁,然后执行业务了。此时,就再一次出现了两个线程在同时执行业务的情况。

显然,这是判断标识操作和释放操作是两个动作造成的,要想避免这种现象,就必须要确保这俩个动作是也原子性的操作,必须同时执行,不可以有间隔。


Lua 脚本解决多条命令原子性的问题

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。

Lua是一种编程语言,它的基本语法大家可以参考网站https://www.runoob.com/lua/lua-tutorial.html

这里重点介绍 Redis 提供的调用函数,语法如下:

  • 执行 Redis 命令
redis.call('命令名称', 'key', '其他参数', ...)
  • 例如,我们要执行 set name Jack,其脚本语言如下
redis.call('set', 'name', 'Jack')
  • 例如,我们要先执行 set name Rose,再执行 get name,则脚本如下

    • 先执行 set name Rose

      redis.call('set', 'name', 'Rose')
    • 再执行 get name

      local name = redis.call('get', 'name')
    • 最后返回

      return name
  • 写好脚本以后,需要用 Redis 命令来调用脚本,调用脚本的常见命令如下

    help @scripting

  • 例如,我们要执行 redis.call('set', 'name', 'Jack') 这个脚本,语法如下

    • 调用脚本

      EVAL "return redis.call('set', 'name', 'Jack')" 0

    • 如果脚本中的 key、value 不想写固定,可以作为参数传递。

      key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数:

      EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name2 Rose2


释放锁的业务流程是这样的:

  1. 获取锁中的线程标示

  2. 判断是否与指定的标示(当前线程标示)一致

  3. 如果一致则释放锁(删除)

  4. 如果不一致则什么都不做

如果用 Lua 脚本来表示则是这样

-- 这里的 KEYS[1] 就是锁的 key,这里的 ARGV[1] 就是当前线程标识-- 获取锁中的线程标识 get keylocal id = redis.call('get', KEYS[1]);-- 比较线程标识与锁中的标识是否一致if (id == ARGV[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1])endreturn 0

案例:基于 Lua 脚本实现分布式锁的释放锁逻辑

提示:RedisTmeplate 调用 Lua 脚本的 API 如下


小结

基于 Redis 的分布式锁实现思路

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用 set nx 满足互斥性

  • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性

  • 利用 Redis 集群保证高可用和高并发特性


6.3.5.Redisson 分布式锁


基于 setnx 实现的分布式锁存在下面的问题

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回 false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果 Redis 提供了主从集群,主从延同步在延迟,当主机宕机时,如果从机同步主机中的数据,则会出现锁失效。

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。

它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

分布式锁Lock和同步器Synchronizer

  • 可重入锁Reentrant Lock
  • 公平锁Fair Lock
  • 联锁MultiLock
  • 红锁RedLock
  • 读写锁ReadWriteLock
  • 信号量Semaphore
  • 可过期性信号量PermitExpirableSemaphore
  • 闭锁CountDownLatch
  • 官网地址https://redisson.org
  • GitHub 地址https://github.com/redisson/redisson

Redisson 快速入门

  1. 引入依赖

    <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
  2. 配置 Redisson 客户端

    @Configurationpublic class RedisConfig {@Beanpublic RedissonClient redissionClient() {// 配置类Config config = new Config();// 添加 Redis 地址,此处添加了单点的地址,也可以使用 config.useClusterServers() 添加集群地址config.useSingleServer().setAddress("redis://192.168.2.12:6379").setPassword("123321");// 创建客户端return Redisson.create(config);}}
  3. 使用 Redisson 的分布式锁

    @Resourceprivate RedissonClient redissonClient;@Testvoid testRedisson() throws InterruptedException {// 获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试过),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);// 判断锁是否获取成功if (isLock) {try {System.out.println("执行业务");} finally {//释放锁lock.unlock();}}}

Redisson 可重入锁原理

  • 获取锁的 Lua 脚本

    local key = KEYS[1]; -- 锁的 keylocal threadId = ARGV[1]; -- 线程的唯一标识local releaseTime = ARGV[2] -- 锁的自动释放时间-- 判断是否存在if (redis.call('exists', key) == 0) then-- 不存在,则获取锁redis.call('hset', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);return 1; --返回结果end-- 锁已经存在,判断 threadId 是否为自己(的线程)if (redis.call("hexists", key, threadId) == 1) then-- 是自己的线程。获取锁,重入次数 + 1redis.call('hincrby', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);end ;return 0; -- 代码走到这里,说明锁获取的不是自己,获取锁失败
  • 释放锁的脚本

    local key = KEYS[1]; -- 锁的 keylocal threadId = ARGV[1]; -- 线程的唯一标识local releaseTime = ARGV[2]; -- 锁的自动释放时间-- 判断当前锁是否还是被自己持有if (redis.call('HEXISTS', key, threadId) == 0) thenreturn nil;end ;-- 是自己的锁,则重入次数 - 1local count = redis.call('HINCRBY', key, threadId, -1);-- 判断是否重入次数是否为 0if (count > 0) then-- 大于 0 说明不能释放锁,重置有效期然后返回redis.call('EXPIRE', key, releaseTime);return nil;else-- 等于 0 说明可以释放锁,直接删除redis.call('DEL', key);return nil;end ;

视频链接Redisson 的锁重试和 WatchDog 机制


Redisson 分布式锁原理


小结:Redisson 分布式锁原理

  • 可重入:利用 hash 结构记录线程 id 和重入次数

  • 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制

  • 超时续约:利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间


Redisson 分布式锁主从一致性问题

  • MultiLock

为了提高 Redis 的可用性,往往都需要搭建一个主从集群。向集群写数据时,主机要要将数据同步给从机。

假设主机同步数据到从机的时候,突然宕机了。此时哨兵会发现异常,重新选举一个从机称为主机。

但是之前的数据尚未完成,这样一来就造成了数据丢失,很有可能就会造成锁失效的问题。

这里可以搭建主从,也可以不搭建主从。

  • 以下内容仅供参考

至于视频中搭建的三台服务器,可以使用 Docker 来解决这些问题(毕竟三台虚拟机占的磁盘空间还是蛮大的)

docker run \--name docker_redis1 \-p 6381:6379 \-v /usr/local/docker/volumes/redis1:/etc/redis/redis.conf\-v /usr/local/docker/volumes/redis1/data:/data \-d redis:6.2.6 \redis-server /etc/redis/redis.conf \--appendonly yes

上面的代码块是我创建的 docker_redis1 容器的命令(挂载了 redis.conf 配置文件)

配置文件可以从之前的 Redis 目录中复制即可,更改了三处地方。

pidfile /usr/local/docker/volumes/redis3/data/docker_redis1.pidlogfile "/usr/local/docker/volumes/redis3/data/docker_redis3.log"dir /usr/local/docker/volumes/redis3/data/

以此类推,即可搭建 docker_redis2docker_redis3 的 Docker 容器。


不过我使用的是之前的 redis ,以及 用 docker 创建的两个容器 docker_redis2docker_redis3

毕竟 application.yml 中配置的是 最初的 redis。

我在运行途中出现了如下错误:

java.lang.IllegalStateException: Failed to load ApplicationContext... ...Caused by: org.springframework.beans.factory.BeanCreationException... ...... Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6381 ...

报错:无法连接新建的这几个 Redis … … 最后发现是创建 Docker 容器时未指定密码的锅。

src/main/java/com/hmdp/config/RedissonConfig.java 中指定的 password 删除掉即可。


  1. 不可重入Redis 分布式锁
    • 原理:利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标示
    • 缺陷:不可重入、无法重试、锁超时失效
  2. 可重入的 Redis 分布式锁
    • 原理:利用 hash 结构,记录线程标示和重入次数;利用 watchDog 延续锁时间;利用信号量控制锁重试等待
    • 缺陷:Redis 宕机引起锁失效问题
  3. Redisson 的 multiLock
    • 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
    • 缺陷:运维成本高、实现复杂

7.Redis 实现秒杀优化


之前的秒杀优惠劵的下单功能中,有四步操作是串行的,直接面向数据库的,执行效率是很低的。

分离成两个线程,一个线程判断用户的购买资格,发现用户有购买资格后再开启一个独立的线程来处理耗时较久的减库存、下单的操作。可以将耗时较短的两步操作放到 Redis 中,在 Redis 中处理对应的秒杀资格的判断。Redis 的性能是比 MySQL 要好的。此外,还需要引入异步队列记录相关的信息。



案例:改进秒杀业务,提高并发性能

需求:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中
  2. 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  3. 如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

小结

秒杀业务的优化思路是什么

  • 先利用 Redis 完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题
  • 数据安全问题

8.秒杀的异步优化


8.1.概述


消息队列Message Queue),字面意思就是存放消息的队列。


最简单的消息队列模型包括 3 个角色

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息


Redis 提供了三种不同的方式来实现消息队列:

  • List 结构:基于 List 结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

8.2.基于 List 结构模拟消息队列


消息队列Message Queue),字面意思就是存放消息的队列。

而 Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现。

不过要注意的是,当队列中没有消息时 RPOPLPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。

因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。


基于 List 的消息队列有哪些优缺点?

  • 优点
    • 利用 Redis 存储,不受限于 JVM 内存上限
    • 基于 Redis 的持久化机制,数据安全性有保证
    • 可以满足消息有序性
  • 缺点
    • 无法避免消息丢失
    • 只支持单消费者

8.3.基于 PubSub 的消息队列


PubSub发布订阅) 是 Redis 2.0 版本引入的消息传递模型。

顾名思义,消费者可以订阅一个或多个channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与 pattern 格式匹配的所有频道
    • pattern
      • " />


        基于 PubSub 的消息队列有哪些优缺点?

        • 优点:采用发布订阅模型,支持多生产、多消费
        • 缺点:
          • 不支持数据持久化
          • 无法避免消息丢失
          • 消息堆积有上限,超出时数据丢失

        8.4.基于 Stream 的消息队列


        Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。


        8.4.1.单消费模式


        发送消息的命令

        XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
        • key:队列名称
        • [NOMKSTREAM]:如果队列不存在时,确定是否自动创建队列,默认自动创建
        • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量
        • *|ID:消息的唯一 ID,* 代表由 Redis 自动生成,格式是 ”时间戳-递增数字“,例如:”1666161469358-0“
        • field value [field value ...]:发送到队列中的消息,称为 Entry。格式为多个 Key-Value 键值对。

        例如:创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用 Redis 自动生成 ID

        127.0.0.1:6379> XADD users * name jack age 21 "1644805700523-0"

        读取消息的方式之一:XREAD

        XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
        • [COUNT count]:每次读取消息的最大数量;
        • [BLOCK milliseconds]:当没有消息时,确定是否阻塞,阻塞则添加具体的 milliseconds (阻塞时长)
        • STREAMS key [key ...]:从哪个队列读取消息,Key 就是队列名;
        • ID [ID ...]:起始 ID,只返回大于该 ID 的消息;0 代表从第一个消息开始,$ 代表从最新的消息开始。

        例如,使用 XREAD 读取第一个消息

        127.0.0.1:6379> XREAD COUNT 1 STREAMS users 01) 1) "queue" 2) 1) 1) "1666169070359-0" 2) 1) "name"2) "jack"3) "age"4) 20

        XREAD 阻塞方式,读取最新的消息

        XREAD COUNT 1 BLOCK STREAMS queue $

        在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

        • 注意
          • 当我们指定起始 ID 为 $ 时,代表读取最新的消息
          • 如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条
          • 如此便会出现漏读消息的问题

        STREAM 类型消息队列的 XREAD 命令特点:

        • 消息可回溯(消息永久的保存在消息队列中)
        • 一个消息可以被多个消费者读取
        • 可以阻塞读取
        • 有消息漏读的风险

        8.4.2.消费组模式


        消费者组Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

        其具备下列特点:

        • 消息分流:队列中的 消息会分流给组内不同的消费者,而不是重复消费,从而加快消息处理的速度。

        • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,即使消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费。(解决漏读问题)

        • 消息确认:消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list

          当处理完成后需要通过 XACK 命令来确认消息,标记消息为已处理,才会从 pending-list 中移除。(解决消息丢失问题)


        创建消费者组

        XGROUP CREATE key groupName ID [MKSTREAM]
        • key:队列名称

        • groupName:消费者组名称

        • ID:起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息

        • MKSTREAM:队列不存在时自动创建队列


        其它常见命令

        # 删除指定的消费者组XGROUP DESTORY key groupName# 给指定的消费者组添加消费者XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者XGROUP DELCONSUMER key groupname consumername

        从消费者组读取消息

        XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
        • group:消费组名称
        • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
        • count:本次查询的最大数量
        • BLOCK milliseconds:当没有消息时最长等待时间
        • NOACK:无需手动 ACK,获取到消息后自动确认
        • STREAMS key:指定队列名称
        • ID:获取消息的起始 ID:
          • ">":从下一个未消费的消息开始
          • 其它:根据指定 id 从 pending-list 中获取已消费但未确认的消息。
            例如 0,是从 pending-list 中的第一个消息开始

        确认消息

        XACK 消息队列名 消息组名 消息id

        pending-list 中获取消息(即未确认的消息)

        XPENDING key group [[IDLE min-idle-time]] start end count [consumer]]

        如:xpending s1 g1 - + 10


        消费者监听消息的基本思路(伪代码)


        STREAM 类型消息队列的 XREADGROUP 命令特点:

        • 消息可回溯
        • 可以多消费者争抢消息,加快消费速度
        • 可以阻塞读取
        • 没有消息漏读的风险
        • 有消息确认机制,保证消息至少被消费一次

        8.5.Redis 三种消息队列的对比


        ListPubSubStream
        消息持久化支持不支持支持
        阻塞读取支持支持支持
        消息堆积处理受限于内存空间,
        可以利用多消费者加快处理
        受限于消费者缓冲区受限于队列长度,
        可以利用消费者组提高消费速度,减少堆积
        消息确认机制不支持不支持支持
        消息回溯不支持不支持支持

        8.6.消息队列异步秒杀下单


        案例:基于 Redis 的 Stream 结构作为消息队列,实现异步秒杀下单

        需求:

        1. 创建一个 Stream 类型的消息队列,名为 stream.orders
        2. 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
        3. 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单
        XGROUP CREATE stream.orders g1 0 MKSTREAM
        -- 1.参数列表-- 1.1.优惠券 idlocal voucherId = ARGV[1]-- 1.2.用户 idlocal userId = ARGV[2]-- 1.3.订单 idlocal orderId = ARGV[3]-- 2.数据 key-- 2.1.库存 keylocal stockKey = 'seckill:stock:' .. voucherId-- 2.2.订单 keylocal orderKey = 'seckill:order:' .. voucherIdlocal stockKey_value = redis.call('get', stockKey)-- 3.脚本业务-- 3.1.判断库存是否充足 get stockKeyif (tonumber(stockKey_value) <= 0) then-- 3.2.库存不足,返回 1return 1end-- 3.2.判断用户是否下单 SISMEMBER orderKey userIdif (redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,则说明该用户是重复下单(这是不允许的),则返回 2return 2end-- 3.4.扣库存 incrby stockKey -1redis.call('incrby', stockKey, -1)-- 3.5.下单(保存用户) sadd orderKey userIdredis.call('sadd', orderKey, userId)-- 3.6.发送消息到队列中:XADD stream.orders * k1 v1 k2 v2 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0

        9.达人探店功能



        9.1.分享探店笔记


        探店笔记类似点评网站的评价,往往是图文结合。

        对应的表有两个:

        • tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
        • tb_blog_comments:其他用户对探店笔记的评价


        点击首页最下方菜单栏中的+按钮,即可发布探店图文:

        • 上传图片接口地址:http://localhost:8080/api/upload/blog
        • 发布笔记接口地址:http://localhost:8080/api/blog

        文件上传的设置


        9.2.查看探店笔记


        需求:点击首页的探店笔记,会进入详情页面,实现该页面的查询接口


        9.3.点赞功能


        在首页的探店笔记排行榜和探店图文详情页面都有点赞的功能


        案例:完善点赞功能
        需求

        • 同一个用户只能点赞一次,再次点击则取消点赞
        • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段 Blog 类的 isLike 属性)

        实现步骤

        1. 给 Blog 类中添加一个 isLike 字段,标示是否被当前用户点赞
        2. 修改点赞功能,利用 RedisSet 集合判断是否点赞过,未点赞过则点赞数 + 1,已点赞过则点赞数 - 1
        3. 修改根据 id 查询 Blog 的业务,判断当前登录用户是否点赞过,赋值给 isLike 字段
        4. 修改分页查询 Blog 业务,判断当前登录用户是否点赞过,赋值给 isLike 字段

        9.4.点赞排行榜


        在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的 TOP5,形成点赞排行榜


        案例:实现查询点赞排行榜的接口

        需求:按照点赞时间先后排序,返回 Top5 的用户

        Redis 中的几种常用的数据结构的比较

        ListSetSortedSet
        排序方式按添加顺序排序无法排序根据 score 值排序
        唯一性不唯一唯一唯一
        查找方式按索引查找
        或首尾查找
        根据元素查找根据元素查找

        10.关注列表



        10.1.关注和取关


        在探店图文的详情页面中,可以关注发布笔记的作者


        案例:实现关注和取关功能

        需求:基于该表数据结构,实现两个接口

        1. 关注和取关接口
        2. 判断是否关注的接口

        关注是 User 之间的关系,是博主与粉丝的关系,数据库中有一张 tb_follow 表来表示

        这里为了简化开发,将主键 id 设置为了自增长


        10.2.共同关注


        点击博主头像,可以进入博主首页


        博主个人首页依赖两个接口:

        1. 根据 id 查询 user 信息
        @GetMapping("/{id}")public Result queryUserById(@PathVariable("id") Long userId) {// 查询详情User user = userService.getById(userId);if (user == null) {return Result.ok();}UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);// 返回return Result.ok(userDTO);}
        1. 根据 id 查询博主的探店笔记
        @GetMapping("/of/user")public Result queryBlogByUserId(@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam("id") Long id) {// 根据用户查询Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();return Result.ok(records);}

        案例:实现共同关注的功能

        需求:利用 Redis 中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友


        10.3.关注推送


        10.3.1.Feed 流分析


        关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供 “沉浸式” 的体验,通过无限下拉刷新获取新的信息。


        Feed 流产品有两种常见模式:

        • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。

          例如朋友圈

          • 优点:信息全面,不会有缺失。并且实现也相对简单
          • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
        • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。

          推送用户感兴趣信息来吸引用户

          • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
          • 缺点:如果算法不精准,可能起到反作用

        本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。

        该模式的实现方案有三种:拉模式、推模式、推拉结合


        拉模式:也叫做读扩散


        推模式:也叫做写扩散。


        推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。


        Feed 流的实现方案

        拉模式推模式推拉结合
        写比例
        读比例
        用户读取延迟
        实现难度复杂简单很复杂
        使用场景很少使用用户量少、没有大V过千万的用户量,有大V

        10.3.2.推送到粉丝收件箱


        案例:基于推模式实现关注推送功能

        需求

        1. 修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱
        2. 收件箱满足可以根据时间戳排序,必须用 Redis 的数据结构实现
        3. 查询收件箱数据时,可以实现分页查询
        @PostMappingpublic Result saveBlog(@RequestBody Blog blog) {// 获取登录用户UserDTO user = UserHolder.getUser();blog.setUserId(user.getId()); // 保存探店笔记blogService.save(blog);return Result.ok();}

        Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。


        Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。


        满足这种条件的 Redis 中的数据结构就是 SortedSet


        10.3.3.滚动分页查询


        案例:实现关注推送页面的分页查询

        需求:需求:在个人主页的 “关注” 卡片中,查询并展示推送的 Blog 信息


        11.GEO 附近搜索



        11.1.GEO 数据结构


        GEO 数据结构

        GEO 就是 Geolocation 的简写形式,代表地理坐标。

        Redis 在 3.2 版本中加入了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。

        常见的命令有:

        • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)

        • GEODIST:计算指定的两个点之间的距离并返回

        • GEOHASH:将指定 member 的坐标转为 hash 字符串形式并返回

        • GEOPOS:返回指定member的坐标

        • GEORADIUS:指定圆心、半径,找到该圆内包含的所有 member,并按照与圆心之间的距离排序后返回。6.2 以后已废弃

        • GEOSEARCH:在指定范围内搜索 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2 新功能

        • GEOSEARCHSTORE:与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 key。 6.2.新功能


        练习 Redis 的 GEO 功能

        需求:

        1. 添加下面几条数据:

          • 北京南站( 116.378248 39.865275 )
          • 北京站( 116.42803 39.903738 )
          • 北京西站( 116.322287 39.893729 )
        2. 计算北京西站到北京站的距离

        3. 搜索天安门( 116.397904 39.909005 )附近 10 km 内的所有火车站,并按照距离升序排序

        GEOADD g1 116.378248 39.865275 "BeiJingNan" 116.42803 39.903738 "BeiJing" 116.322287 39.893729 "BeiJingXi"
        GEODIST g1 "BeiJing" "BeiJingXi" km
        GEOSEARCH g1 FROMLONLAT 116.397904 39.909005 BYRADIUS 10 km WITHDIST

        返回 Hash 值的命令:GEOHASH g1 "BeiJingXi",输出结果:"wx4dyyrmcd0"


        11.2.实现查找附近商铺功能


        在首页中点击某个频道,即可看到频道下的商户


        按照商户类型做分组,类型相同的商户作为同一组,以 typeIdkey商家地址value 存入同一个 GEO 集合中即可


        SpringDataRedis 的 2.3.9 版本并不支持 Redis 6.2 提供的 GEOSEARCH 命令

        因此我们需要提示其版本,修改自己的 pom.xml,内容如下

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId></exclusion><exclusion><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.6.2</version></dependency><dependency><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId><version>6.1.6.RELEASE</version></dependency>

        12.签到功能



        12.1.BitMap


        假如我们用一张表来存储用户签到信息,其结构应该如下

        假如有 1000 万用户,平均每人每年签到次数为 10 次,则这张表一年的数据量为 1 亿条

        每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共 22 字节的内存,一个月则最多需要 600 多字节


        我们按月来统计用户签到信息,签到记录为 1,未签到则记录为 0

        把每一个 bit 位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图BitMap

        Redis 中 是利用 string 类型数据结构实现 BitMap,因此最大上限是 512M,转换为 bit 则是 2 322^{32} 232 个 bit 位。


        BitMap 的操作命令有:

        • SETBIT:向指定位置(offset)存入一个 0 或 1

        • GETBIT :获取指定位置(offset)的 bit 值

        • BITCOUNT :统计 BitMap 中值为 1 的 bit 位的数量

        • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值

        • BITFIELD_RO :获取 BitMap 中 bit 数组,并以十进制形式返回

        • BITOP :将多个 BitMap 的结果做位运算(与 、或、异或)

        • BITPOS :查找 bit 数组中指定范围内第一个 0 或 1 出现的位置

        存储的数据为:11100111(二进制)

        127.0.0.1:6379> SETBIT bm1 0 1(integer) 0127.0.0.1:6379> SETBIT bm1 1 1(integer) 0127.0.0.1:6379> SETBIT bm1 2 1(integer) 0127.0.0.1:6379> SETBIT bm1 5 1(integer) 0127.0.0.1:6379> SETBIT bm1 6 1(integer) 0127.0.0.1:6379> SETBIT bm1 7 1(integer) 0127.0.0.1:6379> GETBIT bm1 2(integer) 1127.0.0.1:6379> BITCOUNT bm1(integer) 6127.0.0.1:6379> BITFIELD bm1 GET u2 0 1) (integer) 3127.0.0.1:6379> BITFIELD bm1 GET u3 0 1) (integer) 7127.0.0.1:6379> BITFIELD bm1 GET u4 0 1) (integer) 14127.0.0.1:6379> BITPOS bm1 0(integer) 3

        12.2.签到功能


        需求:实现签到接口,将当前用户当天签到信息保存到 Redis 中

        说明
        请求方式Post
        请求路径/user/sign
        请求参数
        返回值

        提示:因为 BitMap 底层是基于 String 数据结构,因此其操作也都封装在字符串相关操作中了。


        12.3.签到统计


        • 问题 1:什么叫做连续签到天数?

        从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

        • 问题 2:如何得到本月到今天为止的所有签到数据?

        BITFIELD key GET u[dayOfMonth] 0

        • 问题 3:如何从后向前遍历每个 bit 位?

        与 1 做与运算,就能得到最后一个 bit 位。

        随后右移 1 位,下一个 bit 位就成为了最后一个 bit 位。


        案例:实现签到统计功能

        需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

        说明
        请求方式GET
        请求路径/user/sign/count
        请求参数
        返回值连续签到天数

        13.UV统计功能



        首先我们搞懂两个概念:

        • UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。

          1 天内同一个用户多次访问该网站,只记录1次。

        • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录 1 次PV,用户多次打开页面,则记录多次PV。

          往往用来衡量网站的流量。

        UV 统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。

        但是如果每个访问的用户都保存到 Redis 中,数据量会非常恐怖。


        Hyperloglog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

        相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0

        Redis 中的 HLL 是基于 string 结构实现的,单个 HLL 的内存永远小于 16 kb,内存占用低的令人发指!

        作为代价,其测量结果是概率性的,有小于 0.81% 的误差。不过对于 UV 统计来说,这完全可以忽略。

        127.0.0.1:6379> PFADD hl1 e1 e2 e3 e4 e5(integer) 1127.0.0.1:6379> pfcount hl1(integer) 5127.0.0.1:6379> PFADD hl1 e1 e2 e3 e4 e5(integer) 0127.0.0.1:6379> pfcount hl1(integer) 5

        我们直接利用单元测试,向 HyperLogLog 中添加 100 万条数据,看看内存占用和统计效果如何

        @Testvoid testHyperLogLog() {String[] values = new String[1000];int j = 0;for (int i = 0; i < 1000000; i++) {j = i % 1000;values[j] = "user_" + i;if (j == 999) {// 发送到 RedisstringRedisTemplate.opsForHyperLogLog().add("hl2", values);}}//统计数量Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");System.out.println("count = " + count); }

        • 小结

        HyperLogLog 的作用:做海量数据的统计工作

        HyperLogLog 的优点:内存占用极低、性能非常好

        HyperLogLog 的缺点:有一定的误差