当前位置: 七九推 > IT编程>数据库>Redis > Redisson分布式锁之加解锁详解

Redisson分布式锁之加解锁详解

2023年03月17日 Redis 我要评论
引言2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问道项目中关于分布式锁的应用,后涉及redisson实现分布式锁的原理,答不上来。锁

引言

2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问道项目中关于分布式锁的应用,后涉及redisson实现分布式锁的原理,答不上来。

锁的可重入性

我们都知道,java中synchronized和lock都支持可重入,synchronized的锁关联一个线程持有者和一个计数器。当一个线程请求成功后,jvm会记下持有锁的线程,并将计数器计为1。此时其他线程请求该锁,则必须等待。而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增。当线程退出一个synchronized方法/块时,计数器会递减,如果计数器为0则释放该锁;在reentrantlock中,底层的 aqs 对应的state 同步状态值表示线程获取该锁的可重入次数,通过cas方式进行设置,在默认情况下,state的值为0 表示当前锁没有被任何线程持有,原理类似。所以如果想要实现可重入性,可能须有一个计数器来控制重入次数,实际redisson确实是这么做的。

好的我们通过redisson客户端进行设置,并循环3次,模拟锁重入:000

for(int i = 0; i < 3; i++) {      
    redissonlockutil.trylock("distributed:lock:distribute_key", timeunit.seconds, 20, 100); 
 }

连接redis客户端进行查看:

可以看到,我们设置的分布式锁是存在一个hash结构中,value看起来是循环的次数3,key就不怎么认识了,那这个key是怎么设置进去的呢,另外为什么要设置成为hash类型呢?

加锁

我们先来看看普通的分布式锁的上锁流程:

说明:

  • 客户端在进行加锁时,会校验如果业务上没有设置持有锁时长leasetime,会启动看门狗来每隔10s进行续命,否则就直接以leasetime作为持有的时长;
  • 并发场景下,如果客户端1锁还未释放,客户端2尝试获取,加锁必然失败,然后会通过发布订阅模式来订阅key的释放通知,并继续进入后续的抢锁流程。
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {
      long time = unit.tomillis(waittime);
      long current = system.currenttimemillis();
      long threadid = thread.currentthread().getid();
      long ttl = this.tryacquire(waittime, leasetime, unit, threadid);
      if (ttl == null) {
         return true;
      } else {
         // 订阅分布式key对应的消息,监听其它锁持有者释放,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环
         completablefuture subscribefuture = this.subscribe(threadid);
         subscribefuture.get(time, timeunit.milliseconds);
         try {
            do {
               // 尝试获取锁
               ttl = this.tryacquire(waittime, leasetime, unit, threadid);
               // 竞争获取锁成功,退出循环,不再竞争。
               if (ttl == null) {
                  return true;
               }
               // 利用信号量机制阻塞当前线程相应时间,之后再重新获取锁
               if (ttl >= 0l && ttl < time) {
                  ((redissonlockentry)this.commandexecutor.getnow(subscribefuture)).getlatch().tryacquire(ttl, timeunit.milliseconds);
               } else {
                  ((redissonlockentry)this.commandexecutor.getnow(subscribefuture)).getlatch().tryacquire(time, timeunit.milliseconds);
               }
               time -= system.currenttimemillis() - currenttime;
            } while(time > 0l);
         } finally {
            // 竞争锁成功后,取消订阅该线程id事件
            this.unsubscribe((redissonlockentry)this.commandexecutor.getnow(subscribefuture), threadid);
         }
      }
   }
}
rfuture<long> tryacquireasync(long leasetime, timeunit unit, final long threadid) {
        // 如果设置了持有锁的时长,直接进行尝试加锁操作
         if (leasetime != -1l) {
            return this.trylockinnerasync(leasetime, unit, threadid, rediscommands.eval_long);
        } else {
            // 未设置加锁时长,在加锁成功后,启动续期任务,初始默认持有锁时间是30s
            rfuture<long> ttlremainingfuture = this.trylockinnerasync(this.commandexecutor.getconnectionmanager().getcfg().getlockwatchdogtimeout(), timeunit.milliseconds, threadid, rediscommands.eval_long);
            ttlremainingfuture.addlistener(new futurelistener<long>() {
                public void operationcomplete(future<long> future) throws exception {
                    if (future.issuccess()) {
                        long ttlremaining = (long)future.getnow();
                        if (ttlremaining == null) {
                            redissonlock.this.scheduleexpirationrenewal(threadid);
                        }
                    }
                }
            });
            return ttlremainingfuture;
        }
    }

我们都知道redis执行lua脚本具有原子性,所以在尝试加锁的下层,redis主要执行了一段复杂的lua脚本:

-- 不存在该key时
if (redis.call('exists', keys[1]) == 0) then
      -- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', keys[1], argv[2], 1);
-- 设置过期时间
redis.call('pexpire', keys[1], argv[1]);
return nil;
end;
-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', keys[1], argv[2]) == 1) then
      -- 线程重入次数++
redis.call('hincrby', keys[1], argv[2], 1);
redis.call('pexpire', keys[1], argv[1]);
return nil;
end;
return redis.call('pttl', keys[1]);

参数说明:

keys[1]:对应我们设置的分布式key,即:distributed:lock:distribute_key

argv[1]:业务自定义的加锁时长或者默认的30s;

argv[2]: 具体的客户端初始化连接uuid+线程id: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1

我们从上面的脚本中可以看出核心逻辑其实不难:

  • 如果分布式锁key未被任何端持有,直接根据“客户端连接id+线程id” 进行初始化设置,并设置重入次数为1,并设置key的过期时间;
  • 否则重入次数+1,并重置过期时间;

锁续命

接下来看看scheduleexpirationrenewal续命是怎么做的呢?

private void scheduleexpirationrenewal(final long threadid) {
   if (!expirationrenewalmap.containskey(this.getentryname())) {
      timeout task = this.commandexecutor.getconnectionmanager().newtimeout(new timertask() {
         public void run(timeout timeout) throws exception {
            // 执行续命操作
            rfuture<boolean> future = redissonlock.this.renewexpirationasync(threadid);
            future.addlistener(new futurelistener<boolean>() {
               public void operationcomplete(future<boolean> future) throws exception {
                  redissonlock.expirationrenewalmap.remove(redissonlock.this.getentryname());
                          ...
                  // 续命成功,继续
                  if ((boolean)future.getnow()) {
                     redissonlock.this.scheduleexpirationrenewal(threadid);
                  }
               }
            });
         }
      }, this.internallockleasetime / 3l, timeunit.milliseconds);
   }
}

tip小知识点:

  • 续期是用的什么定时任务执行的?
    redisson用netty的hashedwheeltimer做命令重试机制,原因在于一条redis命令的执行不论成功或者失败耗时都很短,而hashedwheeltimer是单线程的,系统性能开销小。

而在上面的renewexpirationasync中续命操作的执行核心lua脚本要做的事情也非常的简单,就是给这个key的过期时间重新设置为指定的30s.

if (redis.call('hexists', keys[1], argv[2]) == 1) then
    redis.call('pexpire', keys[1], argv[1]);
    return 1;
end;
return 0;

释放锁

释放锁主要是除了解锁本省,另外还要考虑到如果存在续期的情况,要将续期任务删除:

public rfuture<void> unlockasync(long threadid) {
   // 解锁
   rfuture<boolean> future = this.unlockinnerasync(threadid);
   completionstage<void> f = future.handle((opstatus, e) -> {
      // 解除续期
      this.cancelexpirationrenewal(threadid);
      ...
   });
   return new completablefuturewrapper(f);
}

在unlockinnerasync内部,redisson释放锁其实核心也是执行了如下一段核心lua脚本:

    // 校验是否存在
    if (redis.call('hexists', keys[1], argv[3]) == 0) then
      return nil;
      end;
    // 获取加锁次数,校验是否为重入锁
    local counter = redis.call('hincrby', keys[1], argv[3], -1);
    // 如果为重入锁,重置过期时间,锁本身不释放
    if (counter > 0) then
      redis.call('pexpire', keys[1], argv[2]);
      return 0;
   // 删除key
    else redis.call('del', keys[1]);
      // 通知阻塞的客户端可以抢锁啦
      redis.call('publish', keys[2], argv[1]);
      return 1;
      end;
      return nil;

其中:

keys[1]: 分布式锁
keys[2]: redisson_lock_channel:{分布式锁} 发布订阅消息的管道名称
argv[1]: 发布的消息内容
argv[2]: 锁的过期时间
argv[3]: 线程id标识名称

其它问题

  • 红锁这么火,但真的靠谱么?
  • redisson公平锁是什么情况?

以上就是redisson分布式锁第一弹-加解锁的详细内容,更多关于redisson分布式锁加解锁的资料请关注七九推其它相关文章!

(0)
打赏 微信扫一扫 微信扫一扫

相关文章:

  • 在产品中,我们常说的A端/B端/C端是什么?

    在产品中,我们常说的A端/B端/C端是什么?

    一、引言 二、我们常说的A端/B端/C端/R端是什么? 2.1 产品分类 IT产品大致可以分为这四个类型: A 端 是开发界... [阅读全文]
  • 游戏关键字070805

    游戏关键字070805

    .we are ready  统一加速器  刘文辉  理财教室  抗战英雄传  士兵突击在线观看  网游之混迹在美女工作室  爱幼阁  李阳疯狂英语mp3下载 ... [阅读全文]
  • MSM8976平台概述

    MSM8976平台概述

    MSM8976是从8952平台继承下来的,包含两组(cluster)core:   四个A72cpu核... [阅读全文]
  • cve-2017-3506&cve-2017-10271简析

    cve-2017-3506&cve-2017-10271简析

    漏洞利用前提 影响版本 10.3.6.0, 12.1.3.0, 12.2.1.0, 12.2.1.1 , 12.2.1.2 原理简析 漏洞触发位置:... [阅读全文]
  • Redis可以做什么

    Redis可以做什么

    redis用途很广泛,可以用在很多地方 1.记录帖子的点赞数,评论数和点击数hash 2.记录用户的帖子id列表并排序,... [阅读全文]
  • Unable to connect to Redis无法连接到Redis解决的全过程

    项目场景:提示:这里简述项目相关背景:在某个项目中的提交按钮不好用org.springframework.data.redis.redisconnectionfailureexce…

    2023年03月25日 数据库

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2023  七九推 保留所有权利. 粤ICP备17035492号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com