Redis学习笔记--分布式锁和秒杀优化

分布式锁-Redisson

  • 基于SETNX实现的分布式锁存在以下问题
    1. 重入问题
      • 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
    2. 不可重试
      • 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
    3. 超时释放
      • 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
    4. 主从一致性
      • 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
  • 那么什么是Redisson呢
    • Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
  • Redis提供了分布式锁的多种多样功能
    1. 可重入锁(Reentrant Lock)
    2. 公平锁(Fair Lock)
    3. 联锁(MultiLock)
    4. 红锁(RedLock)
    5. 读写锁(ReadWriteLock)
    6. 信号量(Semaphore)
    7. 可过期性信号量(PermitExpirableSemaphore)
    8. 闭锁(CountDownLatch)

Redisson入门

  1. 导入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
    </dependency>
  2. 配置Redisson客户端,在config包下新建RedissonConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    config.useSingleServer()
    .setAddress("redis://101.XXX.XXX.160:6379")
    .setPassword("root");
    return Redisson.create(config);
    }
    }
  3. 使用Redisson的分布式锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Resource
    private RedissonClient redissonClient;

    @Test
    void testRedisson() throws InterruptedException {
    //获取可重入锁
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
    boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
    //判断获取锁成功
    if (success) {
    try {
    System.out.println("执行业务");
    } finally {
    //释放锁
    lock.unlock();
    }
    }
    }
  4. 替换我们之前自己写的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
+   @Resource
+ private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
- SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
+ RLock redisLock = redissonClient.getLock("order:" + userId);
- boolean isLock = redisLock.tryLock(120);
+ boolean isLock = redisLock.tryLock();
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
}

修改后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
RLock redisLock = redissonClient.getLock("order:" + userId);
boolean isLock = redisLock.tryLock();
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
}
  • 使用Jmeter进行压力测试,依旧是只能抢到一张优惠券,满足我们的需求

Redisson可重入锁原理

  • 在Lock锁中,他是借助于等曾的一个voaltile的一个state变量来记录重入的状态的

    • 如果当前没有人持有这把锁,那么state = 0
    • 如果有人持有这把锁,那么state = 1
      • 如果持有者把锁的人再次持有这把锁,那么state会+1
    • 如果对于synchronize而言,他在c语言代码中会有一个count
    • 原理与state类似,也是重入一次就+1,释放一次就-1,直至减到0,表示这把锁没有被人持有
  • 在redisson中,我们也支持可重入锁

    • 在分布式锁中,它采用hash结构来存储锁,其中外层key表示这把锁是否存在,内层key则记录当前这把锁被哪个线程持有
  • method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    @Resource
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach
    void setUp() {
    lock = redissonClient.getLock("lock");
    }

    @Test
    void method1() {
    boolean success = lock.tryLock();
    if (!success) {
    log.error("获取锁失败,1");
    return;
    }
    try {
    log.info("获取锁成功");
    method2();
    } finally {
    log.info("释放锁,1");
    lock.unlock();
    }
    }

    void method2() {
    RLock lock = redissonClient.getLock("lock");
    boolean success = lock.tryLock();
    if (!success) {
    log.error("获取锁失败,2");
    return;
    }
    try {
    log.info("获取锁成功,2");
    } finally {
    log.info("释放锁,2");
    lock.unlock();
    }
    }
  • 所以我们需要额外判断,method1和method2是否处于同一线程,如果是同一个线程,则可以拿到锁,但是state会+1,之后执行method2中的方法,释放锁,释放锁的时候也只是将state进行-1,只有减至0,才会真正释放锁

  • 由于我们需要额外存储一个state,所以用字符串型SET NX EX是不行的,需要用到Hash结构,但是Hash结构又没有NX这种方法,所以我们需要将原有的逻辑拆开,进行手动判断

img

  • 为了保证原子性,所以流程图中的业务逻辑也是需要我们用Lua来实现的

    • 获取锁的逻辑

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      local key = KEYS[1]; -- 锁的key
      local threadId = ARGV[1]; -- 线程唯一标识
      local releaseTime = ARGV[2]; -- 锁的自动释放时间
      -- 锁不存在
      if (redis.call('exists', key) == 0) then
      -- 获取锁并添加线程标识,state设为1
      redis.call('hset', key, threadId, '1');
      -- 设置锁有效期
      redis.call('expire', key, releaseTime);
      return 1; -- 返回结果
      end;
      -- 锁存在,判断threadId是否为自己
      if (redis.call('hexists', key, threadId) == 1) then
      -- 锁存在,重入次数 +1,这里用的是hash结构的incrby增长
      redis.call('hincrby', key, thread, 1);
      -- 设置锁的有效期
      redis.call('expire', key, releaseTime);
      return 1; -- 返回结果
      end;
      return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
    • 释放锁的逻辑

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      local key = KEYS[1];
      local threadId = ARGV[1];
      local releaseTime = ARGV[2];
      -- 如果锁不是自己的
      if (redis.call('HEXISTS', key, threadId) == 0) then
      return nil; -- 直接返回
      end;
      -- 锁是自己的,锁计数-1,还是用hincrby,不过自增长的值为-1
      local count = redis.call('hincrby', key, threadId, -1);
      -- 判断重入次数为多少
      if (count > 0) then
      -- 大于0,重置有效期
      redis.call('expire', key, releaseTime);
      return nil;
      else
      -- 否则直接释放锁
      redis.call('del', key);
      return nil;
      end;

Redisson锁重试和WatchDog机制

  • 前面我们分析的是空参的tryLock方法,现在我们来分析一下这个带参数的

    1
    2
    3
    4
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
  • 源码分析

  • tryAcquireAsync

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {

    if (leaseTime != -1L) {
    return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
    // 如果没有指定释放时间时间,则指定默认释放时间为getLockWatchdogTimeout,底层源码显示是30*1000ms,也就是30秒
    RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e == null) {
    if (ttlRemaining == null) {
    this.scheduleExpirationRenewal(threadId);
    }

    }
    });
    return ttlRemainingFuture;
    }
    }
  • tryLock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    //判断ttl是否为null
    if (ttl == null) {
    return true;
    } else {
    //计算当前时间与获取锁时间的差值,让等待时间减去这个值
    time -= System.currentTimeMillis() - current;
    //如果消耗时间太长了,直接返回false,获取锁失败
    if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    return false;
    } else {
    //等待时间还有剩余,再次获取当前时间
    current = System.currentTimeMillis();
    //订阅别人释放锁的信号
    RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
    //在剩余时间内,等待这个信号
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
    subscribeFuture.onComplete((res, e) -> {
    if (e == null) {
    //取消订阅
    this.unsubscribe(subscribeFuture, threadId);
    }

    });
    }
    //剩余时间内没等到,返回false
    this.acquireFailed(waitTime, unit, threadId);
    return false;
    } else {
    try {
    //如果剩余时间内等到了别人释放锁的信号,再次计算当前剩余最大等待时间
    time -= System.currentTimeMillis() - current;
    if (time <= 0L) {
    //如果剩余时间为负数,则直接返回false
    this.acquireFailed(waitTime, unit, threadId);
    boolean var20 = false;
    return var20;
    } else {
    boolean var16;
    do {
    //如果剩余时间等到了,dowhile循环重试获取锁
    long currentTime = System.currentTimeMillis();
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
    var16 = true;
    return var16;
    }

    time -= System.currentTimeMillis() - currentTime;
    if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    var16 = false;
    return var16;
    }

    currentTime = System.currentTimeMillis();
    if (ttl >= 0L && ttl < time) {
    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= System.currentTimeMillis() - currentTime;
    } while(time > 0L);

    this.acquireFailed(waitTime, unit, threadId);
    var16 = false;
    return var16;
    }
    } finally {
    this.unsubscribe(subscribeFuture, threadId);
    }
    }
    }
    }
    }
  • scheduleExpirationRenewal

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    //不存在,才put,表明是第一次进入,不是重入
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
    oldEntry.addThreadId(threadId);
    } else {
    //如果是第一次进入,则跟新有效期
    entry.addThreadId(threadId);
    this.renewExpiration();
    }
    }
  • renewExpiration

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private void renewExpiration() {
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
    //Timeout是一个定时任务
    Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    public void run(Timeout timeout) throws Exception {
    ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
    if (ent != null) {
    Long threadId = ent.getFirstThreadId();
    if (threadId != null) {
    //重置有效期
    RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
    future.onComplete((res, e) -> {
    if (e != null) {
    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
    } else {
    if (res) {
    //然后调用自己,递归重置有效期
    RedissonLock.this.renewExpiration();
    }

    }
    });
    }
    }
    }
    //internalLockLeaseTime是之前WatchDog默认有效期30秒,那这里就是 30 / 3 = 10秒之后,才会执行
    }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
    }
    }
  • renewExpirationAsync
    重点看lua脚本,先判断锁是不是自己的,然后更新有效时间

    1
    2
    3
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
  • 那么之前的重置有效期的行为该怎么终止呢?当然是释放锁的时候会终止

  • cancelExpirationRenewal

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void cancelExpirationRenewal(Long threadId) {
    //将之前的线程终止掉
    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (task != null) {
    if (threadId != null) {
    task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
    //获取之前的定时任务
    Timeout timeout = task.getTimeout();
    if (timeout != null) {
    //取消
    timeout.cancel();
    }

    EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
    }

    }
    }

img

Redisson锁的MutiLock原理

  • 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例

  • 此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了

  • 哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题

  • 为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性

  • 我们先使用虚拟机额外搭建两个Redis节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Configuration
    public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.137.130:6379")
    .setPassword("root");
    return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://92.168.137.131:6379")
    .setPassword("root");
    return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://92.168.137.132:6379")
    .setPassword("root");
    return Redisson.create(config);
    }
    }
  • 使用联锁,我们首先要注入三个RedissonClient对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private RedissonClient redissonClient2;
    @Resource
    private RedissonClient redissonClient3;

    private RLock lock;

    @BeforeEach
    void setUp() {
    RLock lock1 = redissonClient.getLock("lock");
    RLock lock2 = redissonClient2.getLock("lock");
    RLock lock3 = redissonClient3.getLock("lock");
    lock = redissonClient.getMultiLock(lock1, lock2, lock3);
    }

    @Test
    void method1() {
    boolean success = lock.tryLock();
    redissonClient.getMultiLock();
    if (!success) {
    log.error("获取锁失败,1");
    return;
    }
    try {
    log.info("获取锁成功");
    method2();
    } finally {
    log.info("释放锁,1");
    lock.unlock();
    }
    }

    void method2() {
    RLock lock = redissonClient.getLock("lock");
    boolean success = lock.tryLock();
    if (!success) {
    log.error("获取锁失败,2");
    return;
    }
    try {
    log.info("获取锁成功,2");
    } finally {
    log.info("释放锁,2");
    lock.unlock();
    }
    }
  • 源码分析

  • 当我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合

    1
    2
    3
    4
    5
    6
    7
    public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
    throw new IllegalArgumentException("Lock objects are not defined");
    } else {
    this.locks.addAll(Arrays.asList(locks));
    }
    }
  • 联锁的tryLock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1L;
    //如果传入了释放时间
    if (leaseTime != -1L) {
    //再判断一下是否有等待时间
    if (waitTime == -1L) {
    //如果没传等待时间,不重试,则只获得一次
    newLeaseTime = unit.toMillis(leaseTime);
    } else {
    //想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二
    newLeaseTime = unit.toMillis(waitTime) * 2L;
    }
    }
    //获取当前时间
    long time = System.currentTimeMillis();
    //剩余等待时间
    long remainTime = -1L;
    if (waitTime != -1L) {
    remainTime = unit.toMillis(waitTime);
    }
    //锁等待时间,与剩余等待时间一样
    long lockWaitTime = this.calcLockWaitTime(remainTime);
    //锁失败的限制,源码返回是的0
    int failedLocksLimit = this.failedLocksLimit();
    //已经获取成功的锁
    List<RLock> acquiredLocks = new ArrayList(this.locks.size());
    //迭代器,用于遍历
    ListIterator<RLock> iterator = this.locks.listIterator();

    while(iterator.hasNext()) {
    RLock lock = (RLock)iterator.next();

    boolean lockAcquired;
    try {
    //没有等待时间和释放时间,调用空参的tryLock
    if (waitTime == -1L && leaseTime == -1L) {
    lockAcquired = lock.tryLock();
    } else {
    //否则调用带参的tryLock
    long awaitTime = Math.min(lockWaitTime, remainTime);
    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
    }
    } catch (RedisResponseTimeoutException var21) {
    this.unlockInner(Arrays.asList(lock));
    lockAcquired = false;
    } catch (Exception var22) {
    lockAcquired = false;
    }
    //判断获取锁是否成功
    if (lockAcquired) {
    //成功则将锁放入成功锁的集合
    acquiredLocks.add(lock);
    } else {
    //如果获取锁失败
    //判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环
    if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
    break;
    }
    //否则将拿到的锁都释放掉
    if (failedLocksLimit == 0) {
    this.unlockInner(acquiredLocks);
    //如果等待时间为-1,则不想重试,直接返回false
    if (waitTime == -1L) {
    return false;
    }

    failedLocksLimit = this.failedLocksLimit();
    //将已经拿到的锁都清空
    acquiredLocks.clear();
    //将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁
    while(iterator.hasPrevious()) {
    iterator.previous();
    }
    } else {
    --failedLocksLimit;
    }
    }
    //如果剩余时间不为-1,很充足
    if (remainTime != -1L) {
    //计算现在剩余时间
    remainTime -= System.currentTimeMillis() - time;
    time = System.currentTimeMillis();
    //如果剩余时间为负数,则获取锁超时了
    if (remainTime <= 0L) {
    //将之前已经获取到的锁释放掉,并返回false
    this.unlockInner(acquiredLocks);
    //联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败
    return false;
    }
    }
    }
    //如果设置了锁的有效期
    if (leaseTime != -1L) {
    List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
    //迭代器用于遍历已经获取成功的锁
    Iterator var24 = acquiredLocks.iterator();

    while(var24.hasNext()) {
    RLock rLock = (RLock)var24.next();
    //设置每一把锁的有效期
    RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
    futures.add(future);
    }

    var24 = futures.iterator();

    while(var24.hasNext()) {
    RFuture<Boolean> rFuture = (RFuture)var24.next();
    rFuture.syncUninterruptibly();
    }
    }
    //但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期
    return true;
    }

小结

  1. 不可重入Redis分布式锁
    • 原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
    • 缺陷:不可重入、无法重试、锁超时失效
  2. 可重入Redis分布式锁
    • 原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
    • 缺陷:Redis宕机引起锁失效问题
  3. Redisson的multiLock
    • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功

秒杀优化

异步秒杀思路

  • 我们先来回顾一下下单流程
  • 当用户发起请求,此时会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序,会进行串行操作,分为如下几个步骤
    1. 查询优惠券
    2. 判断秒杀库存是否足够
    3. 查询订单
    4. 校验是否一人一单
    5. 扣减库存
    6. 创建订单
  • 在这六个步骤中,有很多操作都是要去操作数据库的,而且还是一个线程串行执行,这样就会导致我们的程序执行很慢,所以我们需要异步程序执行,那么如何加速呢?
  • 优化方案:我们将耗时较短的逻辑判断放到Redis中,例如:库存是否充足,是否一人一单这样的操作,只要满足这两条操作,那我们是一定可以下单成功的,不用等数据真的写进数据库,我们直接告诉用户下单成功就好了。然后后台再开一个线程,后台线程再去慢慢执行队列里的消息,这样我们就能很快的完成下单业务。
    img
  • 但是这里还存在两个难点
    1. 我们怎么在Redis中快速校验是否一人一单,还有库存判断
    2. 我们校验一人一单和将下单数据写入数据库,这是两个线程,我们怎么知道下单是否完成。
      • 我们需要将一些信息返回给前端,同时也将这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询下单逻辑是否完成
  • 我们现在来看整体思路:当用户下单之后,判断库存是否充足,只需要取Redis中根据key找对应的value是否大于0即可,如果不充足,则直接结束。如果充足,则在Redis中判断用户是否可以下单,如果set集合中没有该用户的下单数据,则可以下单,并将userId和优惠券存入到Redis中,并且返回0,整个过程需要保证是原子性的,所以我们要用Lua来操作,同时由于我们需要在Redis中查询优惠券信息,所以在我们新增秒杀优惠券的同时,需要将优惠券信息保存到Redis中
  • 完成以上逻辑判断时,我们只需要判断当前Redis中的返回值是否为0,如果是0,则表示可以下单,将信息保存到queue中去,然后返回,开一个线程来异步下单,其阿奴单可以通过返回订单的id来判断是否下单成功

img

Redis完成秒杀资格判断

  • 需求:

    1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
    2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否秒杀成功
  • 步骤一:修改保存优惠券相关代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀优惠券信息到Reids,这里并不需要设置有效期,等秒杀活动过了,我们再手动将其删除
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }
  • 使用PostMan发送请求,添加优惠券

    请求路径:

    http://localhost:8080/api/voucher/seckill

    请求方式:POST

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {
    "shopId":1,
    "title":"9999元代金券",
    "subTitle":"365*24小时可用",
    "rules":"全场通用\\nApex猎杀无需预约",
    "payValue":1000,
    "actualValue":999900,
    "type":1,
    "stock":100,
    "beginTime":"2022-01-01T00:00:00",
    "endTime":"2022-12-31T23:59:59"
    }
  • 添加成功后,数据库中和Redis中都能看到优惠券信息

  • 步骤二:编写Lua脚本

    lua的字符串拼接使用..,字符串转数字是tonumber()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    -- 订单id
    local voucherId = ARGV[1]
    -- 用户id
    local userId = ARGV[2]
    -- 优惠券key
    local stockKey = 'seckill:stock:' .. voucherId
    -- 订单key
    local orderKey = 'seckill:order:' .. voucherId
    -- 判断库存是否充足
    if (tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
    end
    -- 判断用户是否下单
    if (redis.call('sismember', orderKey, userId) == 1) then
    return 2
    end
    -- 扣减库存
    redis.call('incrby', stockKey, -1)
    -- 将userId存入当前优惠券的set集合
    redis.call('sadd', orderKey, userId)
    return 0
  • 修改业务逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public Result seckillVoucher(Long voucherId) {
    //1. 执行lua脚本
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(), voucherId.toString(),
    UserHolder.getUser().getId().toString());
    //2. 判断返回值,并返回错误信息
    if (result.intValue() != 0) {
    return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }
    long orderId = redisIdWorker.nextId("order");
    //TODO 保存阻塞队列

    //3. 返回订单id
    return Result.ok(orderId);
    }
  • 现在我们是用PostMan发送请求,redis中的数据会变动,而且不能重复下单,但是数据库中的数据并没有变化

基于阻塞队列实现秒杀优化

  • 修改下单的操作,我们在下单时,是通过Lua表达式去原子执行判断逻辑,如果判断结果不为0,返回错误信息,如果判断结果为0,则将下单的逻辑保存到队列中去,然后异步执行

  • 需求

    1. 如果秒杀成功,则将优惠券id和用户id封装后存入阻塞队列
    2. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
  • 步骤一:创建阻塞队列

    阻塞队列有一个特点:当一个线程尝试从阻塞队列里获取元素的时候,如果没有元素,那么该线程就会被阻塞,直到队列中有元素,才会被唤醒,并去获取元素

    阻塞队列的创建需要指定一个大小

    1
    private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  • 那么把优惠券id和用户id封装后存入阻塞队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Override
    public Result seckillVoucher(Long voucherId) {
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(), voucherId.toString(),
    UserHolder.getUser().getId().toString());
    if (result.intValue() != 0) {
    return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }
    long orderId = redisIdWorker.nextId("order");
    //封装到voucherOrder中
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setId(orderId);
    //加入到阻塞队列
    orderTasks.add(voucherOrder);
    return Result.ok(orderId);
    }
  • 步骤二:实现异步下单功能

    1. 先创建一个线程池

      1
      private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    2. 创建线程任务,秒杀业务需要在类初始化之后,就立即执行,所以这里需要用到@PostConstruct注解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      @PostConstruct
      private void init() {
      SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
      }

      private class VoucherOrderHandler implements Runnable {
      @Override
      public void run() {
      while (true) {
      try {
      //1. 获取队列中的订单信息
      VoucherOrder voucherOrder = orderTasks.take();
      //2. 创建订单
      handleVoucherOrder(voucherOrder);
      } catch (Exception e) {
      log.error("订单处理异常", e);
      }
      }
      }
      }
    3. 编写创建订单的业务逻辑

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      private IVoucherOrderService proxy;
      private void handleVoucherOrder(VoucherOrder voucherOrder) {
      //1. 获取用户
      Long userId = voucherOrder.getUserId();
      //2. 创建锁对象,作为兜底方案
      RLock redisLock = redissonClient.getLock("order:" + userId);
      //3. 获取锁
      boolean isLock = redisLock.tryLock();
      //4. 判断是否获取锁成功(理论上必成功,redis已经帮我们判断了)
      if (!isLock) {
      log.error("不允许重复下单!");
      return;
      }
      try {
      //5. 使用代理对象,由于这里是另外一个线程,
      proxy.createVoucherOrder(voucherOrder);
      } finally {
      redisLock.unlock();
      }
      }
    • 查看AopContext源码,它的获取代理对象也是通过ThreadLocal进行获取的,由于我们这里是异步下单,和主线程不是一个线程,所以不能获取成功

      1
      private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal("Current AOP proxy");
    • 但是我们可以将proxy放在成员变量的位置,然后在主线程中获取代理对象

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      @Override
      public Result seckillVoucher(Long voucherId) {
      Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
      Collections.emptyList(), voucherId.toString(),
      UserHolder.getUser().getId().toString());
      if (result.intValue() != 0) {
      return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
      }
      long orderId = redisIdWorker.nextId("order");
      //封装到voucherOrder中
      VoucherOrder voucherOrder = new VoucherOrder();
      voucherOrder.setVoucherId(voucherId);
      voucherOrder.setUserId(UserHolder.getUser().getId());
      voucherOrder.setId(orderId);
      //加入到阻塞队列
      orderTasks.add(voucherOrder);
      //主线程获取代理对象
      proxy = (IVoucherOrderService) AopContext.currentProxy();
      return Result.ok(orderId);
      }
  • 完整代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    @Service
    @Slf4j
    public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    private IVoucherOrderService proxy;


    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
    SECKILL_SCRIPT = new DefaultRedisScript();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //1. 获取用户
    Long userId = voucherOrder.getUserId();
    //2. 创建锁对象,作为兜底方案
    RLock redisLock = redissonClient.getLock("order:" + userId);
    //3. 获取锁
    boolean isLock = redisLock.tryLock();
    //4. 判断是否获取锁成功(理论上必成功,redis已经帮我们判断了)
    if (!isLock) {
    log.error("不允许重复下单!");
    return;
    }
    try {
    //5. 使用代理对象,由于这里是另外一个线程,
    proxy.createVoucherOrder(voucherOrder);
    } finally {
    redisLock.unlock();
    }
    }

    private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
    while (true) {
    try {
    //1. 获取队列中的订单信息
    VoucherOrder voucherOrder = orderTasks.take();
    //2. 创建订单
    handleVoucherOrder(voucherOrder);
    } catch (Exception e) {
    log.error("订单处理异常", e);
    }
    }
    }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(), voucherId.toString(),
    UserHolder.getUser().getId().toString());
    if (result.intValue() != 0) {
    return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }
    long orderId = redisIdWorker.nextId("order");
    //封装到voucherOrder中
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setId(orderId);
    //加入到阻塞队列
    orderTasks.add(voucherOrder);
    //主线程获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    return Result.ok(orderId);
    }


    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
    // 一人一单逻辑
    Long userId = voucherOrder.getUserId();
    Long voucherId = voucherOrder.getVoucherId();
    synchronized (userId.toString().intern()) {
    int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
    if (count > 0) {
    log.error("你已经抢过优惠券了哦");
    return;
    }
    //5. 扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1")
    .eq("voucher_id", voucherId)
    .gt("stock", 0)
    .update();
    if (!success) {
    log.error("库存不足");
    }
    //7. 将订单数据保存到表中
    save(voucherOrder);
    }
    }
    }

小结

  • 秒杀业务的优化思路是什么?
    1. 先利用Redis完成库存容量、一人一单的判断,完成抢单业务
    2. 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    1. 内存限制问题:
      • 我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
    2. 数据安全问题:
      • 经典服务器宕机了,用户明明下单了,但是数据库里没看到

Redis学习笔记--分布式锁和秒杀优化
https://yztldxdz.top/2023/03/05/Redis学习笔记--分布式锁和秒杀优化/
发布于
2023年3月5日
许可协议