Redis学习笔记--消息队列和达人探店

Redis消息队列

认识消息队列

  • 什么是消息队列?字面意思就是存放消息的队列,最简单的消息队列模型包括3个角色
    1. 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
    2. 生产者:发送消息到消息队列
    3. 消费者:从消息队列获取消息并处理消息
  • 使用队列的好处在于解耦:举个例子,快递员(生产者)吧快递放到驿站/快递柜里去(Message Queue)去,我们(消费者)从快递柜/驿站去拿快递,这就是一个异步,如果耦合,那么快递员必须亲自上楼把快递递到你手里,服务当然好,但是万一我不在家,快递员就得一直等我,浪费了快递员的时间。所以解耦还是非常有必要的
  • 那么在这种场景下我们的秒杀就变成了:在我们下单之后,利用Redis去进行校验下单的结果,然后在通过队列把消息发送出去,然后在启动一个线程去拿到这个消息,完成解耦,同时也加快我们的响应速度
  • 这里我们可以直接使用一些现成的(MQ)消息队列,如kafka,rabbitmq等,但是如果没有安装MQ,我们也可以使用Redis提供的MQ方案(学完Redis我就去学微服务)

基于List实现消息队列

  • 基于List结构模拟消息队列
  • 消息队列(Message Queue),字面意思就是存放消息的队列,而Redis的list数据结构是一个双向链表,很容易模拟出队列的效果
  • 队列的入口和出口不在同一边,所以我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现消息队列。
  • 不过需要注意的是,当队列中没有消息时,RPOP和LPOP操作会返回NULL,而不像JVM阻塞队列那样会阻塞,并等待消息,所以我们这里应该使用BRPOP或者BLPOP来实现阻塞效果
  • 基于List的消息队列有哪些优缺点?
    • 优点
      1. 利用Redis存储,不受限于JVM内存上限
      2. 基于Redis的持久化机制,数据安全性有保障
      3. 可以满足消息有序性
    • 缺点
      1. 无法避免消息丢失(经典服务器宕机)
      2. 只支持单消费者(一个消费者把消息拿走了,其他消费者就看不到这条消息了)

基于PubSub的消息队列

  • PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费和可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

  • SUBSCRIBE channel [channel]:订阅一个或多个频道

  • PUBLISH channel msg:向一个频道发送消息

  • PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

    Subscribes the client to the given patterns.
    Supported glob-style patterns:

    • h?flo subscribes to hello, hallo and hxllo
    • h*llo subscribes to hllo and heeeello
    • h[ae]llo subscribes to hello and hallo, but not hillo

    Use \ to escape special characters if you want to match them verbatim.

  • 基于PubSub的消息队列有哪些优缺点

    • 优点:
      1. 采用发布订阅模型,支持多生产,多消费
    • 缺点:
      1. 不支持数据持久化
      2. 无法避免消息丢失(如果向频道发送了消息,却没有人订阅该频道,那发送的这条消息就丢失了)
      3. 消息堆积有上限,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息发的太快)

基于Stream的消息队列

  • Stream是Redis 5.0引入的一种新数据类型,可以时间一个功能非常完善的消息队列

  • 发送消息的命令

    1
    XADD key [NOMKSTREAM] [MAXLEN|MINID [=!~] threshold [LIMIT count]] *|ID field value [field value ...]
    • NOMKSTREAM
      • 如果队列不存在,是否自动创建队列,默认是自动创建
    • [MAXLEN|MINID [=!~] threshold [LIMIT count]]
      • 设置消息队列的最大消息数量,不设置则无上限
    • *|ID
      • 消息的唯一id,*代表由Redis自动生成。格式是”时间戳-递增数字”,例如”114514114514-0”
    • field value [field value …]
      • 发送到队列中的消息,称为Entry。格式就是多个key-value键值对
  • 举例

    1
    2
    # 创建名为users的队列,并向其中发送一个消息,内容是{name=jack, age=21},并且使用Redis自动生成ID
    XADD users * name jack age 21
  • 读取消息的方式之一:XREAD

    1
    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    • [COUNT count]
      • 每次读取消息的最大数量
    • [BLOCK milliseconds]
      • 当没有消息时,是否阻塞,阻塞时长
    • STREAMS key [key …]
      • 要从哪个队列读取消息,key就是队列名
    • ID [ID …]
      • 起始ID,只返回大于该ID的消息
        • 0:表示从第一个消息开始
        • $:表示从最新的消息开始
  • 例如:使用XREAD读取第一个消息

    1
    2
    3
    4
    5
    6
    7
    云服务器:0>XREAD COUNT 1 STREAMS users 0
    1) 1) "users"
    2) 1) 1) "1667119621804-0"
    2) 1) "name"
    2) "jack"
    3) "age"
    4) "21"
  • 例如:XREAD阻塞方式,读取最新消息

    1
    XREAD COUNT 2 BLOCK 10000 STREAMS users $
  • 在业务开发中,我们可以使用循环调用的XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    while (true){
    //尝试读取队列中的消息,最多阻塞2秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    //没读取到,跳过下面的逻辑
    if(msg == null){
    continue;
    }
    //处理消息
    handleMessage(msg);
    }

    注意:当我们指定其实ID为$时,代表只能读取到最新消息,如果当我们在处理一条消息的过程中,又有超过1条以上的消息到达队列,那么下次获取的时候,也只能获取到最新的一条,会出现漏读消息的问题

  • STREAM类型消息队列的XREAD命令特点

    1. 消息可回溯
    2. 一个消息可以被多个消费者读取
    3. 可以阻塞读取
    4. 有漏读消息的风险

基于Stream的消息队列—消费者组

  • 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列,具备以下特点

    1. 消息分流
      • 队列中的消息会分留给组内的不同消费者,而不是重复消费者,从而加快消息处理的速度
    2. 消息标识
      • 消费者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费
    3. 消息确认
      • 消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完成后,需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除
  • 创建消费者组

    1
    XGROUP CREATE key groupName ID [MKSTREAM]
    • key
      • 队列名称
    • groupName
      • 消费者组名称
    • ID
      • 起始ID标识,$代表队列中的最后一个消息,0代表队列中的第一个消息
    • MKSTREAM
      • 队列不存在时自动创建队列
  • 其他常见命令

    • 删除指定的消费者组

      1
      XGROUP DESTORY key groupName
    • 给指定的消费者组添加消费者

      1
      XGROUP CREATECONSUMER key groupName consumerName
    • 删除消费者组中指定的消费者

      1
      XGROUP DELCONSUMER key groupName consumerName
  • 从消费者组中读取消息

    1
    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [keys ...] ID [ID ...]
    • group
      • 消费者组名称
    • consumer
      • 消费者名,如果消费者不存在,会自动创建一个消费者
    • count
      • 本次查询的最大数量
    • BLOCK milliseconds
      • 当前没有消息时的最大等待时间
    • NOACK
      • 无需手动ACK,获取到消息后自动确认(一般不用,我们都是手动确认)
    • STREAMS key
      • 指定队列名称
    • ID
      • 获取消息的起始ID
        • >:从下一个未消费的消息开始(pending-list中)
        • 其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
  • 消费者监听消息的基本思路

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    while(true){
    // 尝试监听队列,使用阻塞模式,最大等待时长为2000ms
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
    if(msg == null){
    // 没监听到消息,重试
    continue;
    }
    try{
    //处理消息,完成后要手动确认ACK,ACK代码在handleMessage中编写
    handleMessage(msg);
    } catch(Exception e){
    while(true){
    //0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
    if(msg == null){
    //null表示没有异常消息,所有消息均已确认,结束循环
    break;
    }
    try{
    //说明有异常消息,再次处理
    handleMessage(msg);
    } catch(Exception e){
    //再次出现异常,记录日志,继续循环
    log.error("..");
    continue;
    }
    }
    }
    }
  • STREAM类型消息队列的XREADGROUP命令的特点

    1. 消息可回溯
    2. 可以多消费者争抢消息,加快消费速度
    3. 可以阻塞读取
    4. 没有消息漏读风险
    5. 有消息确认机制,保证消息至少被消费一次
List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间, 可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度, 可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

Stream消息队列实现异步秒杀下单

  • 需求:

    1. 创建一个Stream类型的消息队列,名为stream.orders
    2. 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
    3. 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
  • 步骤一:创建一个Stream类型的消息队列,名为stream.orders

    1
    XGROUP CREATE stream.orders g1 0 MKSTREAM
  • 步骤二:修改Lua脚本,新增orderId参数,并将订单信息加入到消息队列中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    -- 订单id
    local voucherId = ARGV[1]
    -- 用户id
    local userId = ARGV[2]
    -- 新增orderId,但是变量名用id就好,因为VoucherOrder实体类中的orderId就是用id表示的
    local id = ARGV[3]
    -- 优惠券key
    local stockKey = 'seckill:stock:' .. voucherId
    -- 订单key
    local orderKey = 'seckill:order:' .. voucherId
    -- 判断库存是否充足
    if (tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
    end
    -- 判断用户是否下单
    if (redis.call('sismember', orderKey, userId) == 1) then
    return 2
    end
    -- 扣减库存
    redis.call('incrby', stockKey, -1)
    -- 将userId存入当前优惠券的set集合
    redis.call('sadd', orderKey, userId)
    -- 将下单数据保存到消息队列中
    redis.call("sadd", 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', id)
    return 0
  • 步骤三:修改秒杀逻辑

由于将下单数据加入到消息队列的功能,我们在Lua脚本中实现了,所以这里就不需要将下单数据加入到JVM的阻塞队列中去了,同时Lua脚本中我们新增了一个参数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    @Override
public Result seckillVoucher(Long voucherId) {
+ long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(), voucherId.toString(),
+ UserHolder.getUser().getId().toString(), String.valueOf(orderId));
if (result.intValue() != 0) {
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下 单");
}
- long orderId = redisIdWorker.nextId("order");
- //封装到voucherOrder中
- VoucherOrder voucherOrder = new VoucherOrder();
- voucherOrder.setVoucherId(voucherId);
- voucherOrder.setUserId(UserHolder.getUser().getId());
- voucherOrder.setId(orderId);
- //加入到阻塞队列
- orderTasks.add(voucherOrder);
//主线程获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}

修改后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Result seckillVoucher(Long voucherId) {
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(), voucherId.toString(),
UserHolder.getUser().getId().toString(), String.valueOf(orderId));
if (result.intValue() != 0) {
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}
//主线程获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}

根据伪代码来修改我们的VoucherOrderHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while(true){
// 尝试监听队列,使用阻塞模式,最大等待时长为2000ms
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
if(msg == null){
// 没监听到消息,重试
continue;
}
try{
//处理消息,完成后要手动确认ACK
handleMessage(msg);
} catch(Exception e){
while(true){
//0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){
//null表示没有异常消息,所有消息均已确认,结束循环
break;
}
try{
//说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e){
//再次出现异常,记录日志,继续循环
log.error("..");
continue;
}
}
}
}

修改后的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
String queueName = "stream.orders";

private class VoucherOrderHandler implements Runnable {

@Override
public void run() {
while (true) {
try {
//1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
//ReadOffset.lastConsumed()底层就是 '>'
StreamOffset.create(queueName, ReadOffset.lastConsumed()));
//2. 判断消息是否获取成功
if (records == null || records.isEmpty()) {
continue;
}
//3. 消息获取成功之后,我们需要将其转为对象
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4. 获取成功,执行下单逻辑,将数据保存到数据库中
handleVoucherOrder(voucherOrder);
//5. 手动ACK,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常", e);
//订单异常的处理方式我们封装成一个函数,避免代码太臃肿
handlePendingList();
}
}
}
}

private void handlePendingList() {
while (true) {
try {
//1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0")));
//2. 判断pending-list中是否有未处理消息
if (records == null || records.isEmpty()) {
//如果没有就说明没有异常消息,直接结束循环
break;
}
//3. 消息获取成功之后,我们需要将其转为对象
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4. 获取成功,执行下单逻辑,将数据保存到数据库中
handleVoucherOrder(voucherOrder);
//5. 手动ACK,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.info("处理pending-list异常");
//如果怕异常多次出现,可以在这里休眠一会儿
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}

达人探店

发布探店笔记

这部分代码已经提供好了,我们来看看对应的数据表

  • tb_blog

探店店笔记表,包含笔记中的标题、文字、图片等

Field Type Collation Null Key Default Extra Comment
id bigint unsigned (NULL) NO PRI (NULL) auto_increment 主键
shop_id bigint (NULL) NO (NULL) 商户id
user_id bigint unsigned (NULL) NO (NULL) 用户id
title varchar(255) utf8mb4_unicode_ci NO (NULL) 标题
images varchar(2048) utf8mb4_general_ci NO (NULL) 探店的照片,最多9张,多张以”,”隔开
content varchar(2048) utf8mb4_unicode_ci NO (NULL) 探店的文字描述
liked int unsigned (NULL) YES 0 点赞数量
comments int unsigned (NULL) YES (NULL) 评论数量
create_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED 创建时间
update_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED on update CURRENT_TIMESTAMP 更新时间
  • tb_blog_comments
  • 其他用户对探店笔记的评价
Field Type Collation Null Key Default Extra Comment
id bigint unsigned (NULL) NO PRI (NULL) auto_increment 主键
user_id bigint unsigned (NULL) NO (NULL) 用户id
blog_id bigint unsigned (NULL) NO (NULL) 探店id
parent_id bigint unsigned (NULL) NO (NULL) 关联的1级评论id,如果是一级评论,则值为0
answer_id bigint unsigned (NULL) NO (NULL) 回复的评论id
content varchar(255) utf8mb4_general_ci NO (NULL) 回复的内容
liked int unsigned (NULL) YES (NULL) 点赞数
status tinyint unsigned (NULL) YES (NULL) 状态,0:正常,1:被举报,2:禁止查看
create_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED 创建时间
update_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED on update CURRENT_TIMESTAMP 更新时间
  • 对应的实体类,数据表中并没有用户头像和用户昵称,但是对应的实体类里却有,这是因为使用了

    1
    @TableField(exist = false)

    用来解决实体类中有的属性但是数据表中没有的字段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    @Data
    @EqualsAndHashCode(callSuper = false)
    @Accessors(chain = true)
    @TableName("tb_blog")
    public class Blog implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
    * 主键
    */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;
    /**
    * 商户id
    */
    private Long shopId;
    /**
    * 用户id
    */
    private Long userId;
    /**
    * 用户图标
    */
    @TableField(exist = false)
    private String icon;
    /**
    * 用户姓名
    */
    @TableField(exist = false)
    private String name;
    /**
    * 是否点赞过了
    */
    @TableField(exist = false)
    private Boolean isLike;

    /**
    * 标题
    */
    private String title;

    /**
    * 探店的照片,最多9张,多张以","隔开
    */
    private String images;

    /**
    * 探店的文字描述
    */
    private String content;

    /**
    * 点赞数量
    */
    private Integer liked;

    /**
    * 评论数量
    */
    private Integer comments;

    /**
    * 创建时间
    */
    private LocalDateTime createTime;

    /**
    * 更新时间
    */
    private LocalDateTime updateTime;
    }
  • 效果图如下
    img

  • 对应的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 保存探店博文
    blogService.save(blog);
    // 返回id
    return Result.ok(blog.getId());
    }
  • 上传图片的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @PostMapping("blog")
    public Result uploadImage(@RequestParam("file") MultipartFile image) {
    try {
    // 获取原始文件名称
    String originalFilename = image.getOriginalFilename();
    // 生成新文件名
    String fileName = createNewFileName(originalFilename);
    // 保存文件
    image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
    // 返回结果
    log.debug("文件上传成功,{}", fileName);
    return Result.ok(fileName);
    } catch (IOException e) {
    throw new RuntimeException("文件上传失败", e);
    }
    }

    注意:这里我们需要修改SystemConstants.IMAGE_UPLOAD_DIR 为自己图片所在的地址,在实际开发中图片一般会放在nginx上或者是云存储上。

查看探店笔记

  • 需求:点击首页的探店笔记,会进入详情页面,我们现在需要实现页面的查询接口
    img

  • 随便点击一张图片,查看发送的请求

    请求网址: http://localhost:8080/api/blog/6
    请求方法: GET

  • 看样子是BlogController下的方法,请求方式为GET,那我们直接来编写对应的方法

Controller层

业务逻辑我们要写在Service层,Controller层只调用

1
2
3
4
@GetMapping("/{id}")
public Result queryById(@PathVariable Integer id){
return blogService.queryById(id);
}

ServiceImpl

  • 在Service类中创建对应方法之后,在Impl类中实现,我们查看用户探店笔记的时候,需要额外设置用户名和其头像,由于设置用户信息这个操作比较通用,所以这里封装成了一个方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public Result queryById(Integer id) {
    Blog blog = getById(id);
    if (blog == null) {
    return Result.fail("评价不存在或已被删除");
    }
    queryBlogUser(blog);
    return Result.ok(blog);
    }

    private void queryBlogUser(Blog blog) {
    Long userId = blog.getUserId();
    User user = userService.getById(userId);
    blog.setName(user.getNickName());
    blog.setIcon(user.getIcon());
    }
  • 我们顺手将queryHotBlog也修改一下,原始代码将业务逻辑写到了Controller中,修改后的完整代码如下

BlogController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
// 修改点赞数量
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}

@GetMapping("/of/me")
public Result queryMyBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
// 根据用户查询
Page<Blog> page = blogService.query()
.eq("user_id", user.getId()).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}

@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}

@GetMapping("/{id}")
public Result queryById(@PathVariable Integer id){
return blogService.queryById(id);
}
}

BlogServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;

@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(this::queryBlogUser);
return Result.ok(records);
}


@Override
public Result queryById(Integer id) {
Blog blog = getById(id);
if (blog == null) {
return Result.fail("评价不存在或已被删除");
}
queryBlogUser(blog);
return Result.ok(blog);
}

private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
}

点赞功能

  • 点击点赞按钮,查看发送的请求

    请求网址: http://localhost:8080/api/blog/like/4
    请求方法: PUT

  • 看样子是BlogController中的like方法,源码如下

    1
    2
    3
    4
    5
    6
    7
    JAVA
    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
    // 修改点赞数量
    blogService.update().setSql("liked = liked + 1").eq("id", id).update();
    return Result.ok();
    }
  • 问题分析:这种方式会导致一个用户无限点赞,明显是不合理的

  • 造成这个问题的原因是,我们现在的逻辑,发起请求只是给数据库+1,所以才会出现这个问题

  • 需求

    1. 同一个用户只能对同一篇笔记点赞一次,再次点击则取消点赞
    2. 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
  • 实现步骤

    1. 修改点赞功能,利用Redis中的set集合来判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1
    2. 修改根据id查询的业务,判断当前登录用户是否点赞过,赋值给isLike字段
    3. 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
  • 具体实现

Controller层

业务逻辑卸载Service层

1
2
3
4
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}

BlogServiceImpl

在BlogService接口中创建对应方法,在Impl中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Result likeBlog(Long id) {
//1. 获取当前用户信息
Long userId = UserHolder.getUser().getId();
//2. 如果当前用户未点赞,则点赞数 +1,同时将用户加入set集合
String key = BLOG_LIKED_KEY + id;
Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if (BooleanUtil.isFalse(isLiked)) {
//点赞数 +1
boolean success = update().setSql("liked = liked + 1").eq("id", id).update();
//将用户加入set集合
if (success) {
stringRedisTemplate.opsForSet().add(key, userId.toString());
}
//3. 如果当前用户已点赞,则取消点赞,将用户从set集合中移除
}else {
//点赞数 -1
boolean success = update().setSql("liked = liked - 1").eq("id", id).update();
if (success){
//从set集合移除
stringRedisTemplate.opsForSet().remove(key, userId.toString());
}
}
return Result.ok();
}
  • 修改完毕之后,页面上还不能立即显示点赞完毕的后果,我们还需要修改查询Blog业务,判断Blog是否被当前用户点赞过

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    @Override
    public Result queryHotBlog(Integer current) {
    // 根据用户查询
    Page<Blog> page = query()
    .orderByDesc("liked")
    .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
    // 获取当前页数据
    List<Blog> records = page.getRecords();
    // 查询用户
    records.forEach(blog -> {
    queryBlogUser(blog);
    //追加判断blog是否被当前用户点赞,逻辑封装到isBlogLiked方法中
    isBlogLiked(blog);
    });
    return Result.ok(records);
    }

    @Override
    public Result queryById(Integer id) {
    Blog blog = getById(id);
    if (blog == null) {
    return Result.fail("评价不存在或已被删除");
    }
    queryBlogUser(blog);
    //追加判断blog是否被当前用户点赞,逻辑封装到isBlogLiked方法中
    isBlogLiked(blog);
    return Result.ok(blog);
    }

    private void isBlogLiked(Blog blog) {
    //1. 获取当前用户信息
    Long userId = UserHolder.getUser().getId();
    //2. 判断当前用户是否点赞
    String key = BLOG_LIKED_KEY + blog.getId();
    Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    //3. 如果点赞了,则将isLike设置为true
    blog.setIsLike(BooleanUtil.isTrue(isMember));
    }

点赞排行榜

  • 当我们点击探店笔记详情页面时,应该按点赞顺序展示点赞用户,比如显示最早点赞的TOP5,形成点赞排行榜,就跟QQ空间发的说说一样,可以看到有哪些人点了赞
  • 之前的点赞是放到Set集合中,但是Set集合又不能排序,所以这个时候,我们就可以改用SortedSet(Zset)
  • 那我们这里顺便就来对比一下这些集合的区别
List Set SortedSet
排序方式 按添加顺序排序 无法排序 根据score值排序
唯一性 不唯一 唯一 唯一
查找方式 按索引查找或首尾查找 根据元素查找 根据元素查找
  • 修改BlogServiceImpl

    由于ZSet没有isMember方法,所以这里只能通过查询score来判断集合中是否有该元素,如果有该元素,则返回值是对应的score,如果没有该元素,则返回值为null

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Override
    public Result likeBlog(Long id) {
    //1. 获取当前用户信息
    Long userId = UserHolder.getUser().getId();
    //2. 如果当前用户未点赞,则点赞数 +1,同时将用户加入set集合
    String key = BLOG_LIKED_KEY + id;
    //尝试获取score
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    //为null,则表示集合中没有该用户
    if (score == null) {
    //点赞数 +1
    boolean success = update().setSql("liked = liked + 1").eq("id", id).update();
    //将用户加入set集合
    if (success) {
    stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
    }
    //3. 如果当前用户已点赞,则取消点赞,将用户从set集合中移除
    } else {
    //点赞数 -1
    boolean success = update().setSql("liked = liked - 1").eq("id", id).update();
    if (success) {
    //从set集合移除
    stringRedisTemplate.opsForZSet().remove(key, userId.toString());
    }
    }
    return Result.ok();
    }
  • 同时修改isBlogLiked方法,在原有逻辑上,判断用户是否已登录,登录状态下才会继续判断用户是否点赞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void isBlogLiked(Blog blog) {
    //1. 获取当前用户信息
    UserDTO userDTO = UserHolder.getUser();
    //当用户未登录时,就不判断了,直接return结束逻辑
    if (userDTO == null) {
    return;
    }
    //2. 判断当前用户是否点赞
    String key = BLOG_LIKED_KEY + blog.getId();
    Double score = stringRedisTemplate.opsForZSet().score(key, userDTO.getId().toString());
    blog.setIsLike(score != null);
    }
  • 那我们继续来完善显示点赞列表功能,查看浏览器请求,这个请求目前应该是404的,因为我们还没有写,他需要一个list返回值,显示top5点赞的用户

    请求网址: http://localhost:8080/api/blog/likes/4
    请求方法: GET

  • 在Controller层中编写对应的方法,点赞查询列表,具体逻辑写到BlogServiceImpl中

    1
    2
    3
    4
    @GetMapping("/likes/{id}")
    public Result queryBlogLikes(@PathVariable Integer id){
    return blogService.queryBlogLikes(id);
    }
  • 具体逻辑如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Override
    public Result queryBlogLikes(Integer id) {
    String key = BLOG_LIKED_KEY + id;
    //zrange key 0 4 查询zset中前5个元素
    Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    //如果是空的(可能没人点赞),直接返回一个空集合
    if (top5 == null || top5.isEmpty()) {
    return Result.ok(Collections.emptyList());
    }
    List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
    //将ids使用`,`拼接,SQL语句查询出来的结果并不是按照我们期望的方式进行排
    //所以我们需要用order by field来指定排序方式,期望的排序方式就是按照查询出来的id进行排序
    String idsStr = StrUtil.join(",", ids);
    //select * from tb_user where id in (ids[0], ids[1] ...) order by field(id, ids[0], ids[1] ...)
    List<UserDTO> userDTOS = userService.query().in("id", ids)
    .last("order by field(id," + idsStr + ")")
    .list().stream()
    .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
    .collect(Collectors.toList());
    return Result.ok(userDTOS);
    }
  • 重启服务器,查看效果
    img


Redis学习笔记--消息队列和达人探店
https://yztldxdz.top/2023/03/10/Redis学习笔记--消息队列和达人探店/
发布于
2023年3月10日
许可协议