Redis系列-Redis-分布式锁(JAVA-lettuce客户端)
分布式锁
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在单机环境中,Java 中提供了很多并发处理相关的API。但是在分布式集群系统中,由于分布式系统分布在不同机器上,运行在不同的 jvm 中,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
redis 实现分布式锁
Redis 为单进程单线程模式,采用队列模式将并发访问变成串行访问,且 redis 无论是单机还是集群都很好的实现了数据同步。作为主流的缓存数据库,redis 实现分布式锁再好不过了。下面将讲解如何实现 redis 分布式锁,本文采用的 redis 客户端为 lettuce。
获取锁
redis 有一个命令能达到互斥要求,那就是 setnx (set if not exists) , setnx 命令设置一个 key ,如果这个 key 存在则返回失败,不存在则设置并返回成功。由于 redis 单线程串行执行命令特性,永远只有一个请求能够设置成功 key 。
if (commands.setnx("lock", "got it!")) {
//do sth
}
释放锁
获取到锁了,别忘了释放锁,不然你永远拿着这个锁别的服务就无法再次拿到,就会导致服务请求一直阻塞。另外,我们在释放锁时,要确认这个锁是我们加上的,虽然看起来现在是只有一个地方获取锁、释放锁,但为了安全起见,我们在获取锁时设置一个唯一值,释放锁时确认一下这个值保证安全性,防止我们不小心获得了锁,把别人的锁给释放了。
主动删除锁
我们可以主动删除锁,这里用到了 lua 脚本保持原子性,先判断锁是否是我们持有的,然后在删除:
String key = "lock";
//唯一标识,释放锁时确保是自己持有的锁
String value = UUID.randomUUID().toString();
if (stringCommands.setnx(key, value)) {
//do sth
//释放锁
commands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
new String[]{key}, value);
}
但是这样就会有一个问题,如果我们在 do sth 时程序异常了,那么锁就永远删除不了了。。。那么我们可以怎么办,你可能会说:我们可以在 try catch finally 里释放锁,这样程序异常就可以释放了。好吧,我们试一试:
String key = "lock";
//唯一标识,释放锁时确保是自己持有的锁
String value = UUID.randomUUID().toString();
try {
if (stringCommands.setnx(key, value)) {
//do sth
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
commands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
new String[]{key}, value);
}
但是!我说如果 jvm 被杀死了呢,根据墨菲定律,我们总会往最坏的方面去想,恰好我们拿到了锁,恰好我们刚拿到锁 jvm 被别人杀死了,看起来 finally 也无法很好的释放锁。
那么,我们再想想。。。对了!我们可以设置锁的过期时间,设置过期时间如果持锁的服务不在了,到时候 redis 自然会释放锁。注意获取锁和设置锁的过期时间要保证原子性,不能分开执行,分开执行也会出现 jvm 死掉,锁无法设置过期时间而无法释放的情况。
public static void main(String[] args) {
String key = "lock";
//唯一标识,释放锁时确保是自己持有的锁
String value = UUID.randomUUID().toString();
try {
//一条命令设置锁和过期时间
if ("OK".equals(commands.set(key, value, SetArgs.Builder.nx().ex(1000)))) {
//do sth
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
commands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
new String[]{key}, value);
}
}
但是仔细想想我们并不能很好的计算这个过期时间,如果持锁的服务运行时间比锁的过期时间长,那么锁此时过期被释放别的服务也就会拿到锁,保证不了真正的串行执行。这时候你可能会说:我们可以自己评估服务的执行时间设而置过期时间。诚然,这也是一种办法,如果你可以接受服务平均耗时时间增加的话。
换个思路,我们是不是可以在服务运行期间,雇个人给我们看着锁,如果锁快过期了让他给我们续费一下,我们就可以安心的执行下去了,好像没什么问题,来试试。
续费线程
public class LockMan implements Runnable {
private final String key;
private final String value;
private int lockTime;
private RedisCommands commands;
boolean isOk = true;
public LockMan(String key, String value, int lockTime, RedisCommands commands) {
this.key = key;
this.value = value;
this.lockTime = lockTime;
this.commands = commands;
}
@Override
public void run() {
while (isOk) {
try {
//让线程休息会,别老占用资源
Thread.sleep(lockTime * 1000 / 3);
//KEYS 参数
String[] keys = new String[]{key};
//ARGV 参数
Object[] values = new Object[]{value, String.valueOf(lockTime)};
//这里采用 lua 脚本 “续费锁” ,注意要续费是自己持有的锁, value 值唯一确认现在这把锁是自己持有的
Object eval = commands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return 0 end",
ScriptOutputType.INTEGER,
keys, values);
if (Integer.parseInt(eval.toString()) == 1) {
System.out.println("续费成功,将锁超时时间重置为 " + lockTime + "s");
} else {
stop();
System.out.println("续费失败");
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
System.out.println("续费线程异常");
e.printStackTrace();
}
}
System.out.println("续费结束");
}
public void stop() {
this.isOk = false;
}
}
然后我们在拿到锁后,让这个 LockMan 给我们“续费”。
public static void main(String[] args) {
String key = "lock";
//唯一标识,释放锁时确保是自己持有的锁
String value = UUID.randomUUID().toString();
int lockTime = 10;
try {
if ("OK".equals(stringCommands.set(key, value, SetArgs.Builder.nx().ex(lockTime)))) {
System.out.println("---locked---");
LockMan lockMan = new LockMan(key, value, lockTime, (RedisCommands) stringCommands);
Thread lockThread = new Thread(lockMan);
lockThread.setDaemon(true);
lockThread.start();
//模拟服务过程
sleep(25000);
//服务结束,通知“续费”线程结束
lockMan.stop();
lockThread.interrupt();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//删除自己持有的锁
commands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
new String[]{key}, value);
System.out.println("释放锁成功");
connection.close();
}
}
阻塞性
阻塞性我们可以将争抢锁的线程适当的睡眠等待即可。
可重入性
既然我们想“可重入”,相当于要把“我已经拿到锁了”这个信息保存下来,以便“我”在已经获得锁的情况下再次拿锁时直接拿到。你也许会说:谁会傻到外面已经拿到锁了,在里面再获取一次!那你就太天真了,没有不犯错的程序员。如果你的分布式锁没有重入性,那我们如果在持锁线程内部再次获取锁,会一直等待,然后因为我们雇了一个人续费锁,锁得不到释放,产生了死锁!!!
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层函数仍然有获取该锁的代码,但不受影响。
注意这句话,“我已经拿到了锁”,这里“我”就代表线程,只需要我们把线程信息的持锁情况保存下来就可以了。
public class MyLock {
private final ConcurrentMap<Thread, LockData> THREAD_DATA;
private final RedisCommands<String, String> redisCommands;
private final String lockKey;
private final String lockValue;
public MyLock(RedisCommands<String, String> redisCommands, String lockKey, String lockValue) {
this.lockKey = lockKey;
this.lockValue = lockValue;
this.redisCommands = redisCommands;
this.THREAD_DATA = new ConcurrentHashMap<>();
}
public boolean getLock(int lockTime) {
//判断当前线程是否已经持有锁
Thread currentThread = Thread.currentThread();
LockData lockData = THREAD_DATA.get(currentThread);
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
if ("OK".equals(redisCommands.set(lockKey, lockValue, SetArgs.Builder.nx().ex(lockTime)))) {
LockMan lockMan = new LockMan(lockKey, lockValue, lockTime, redisCommands);
Thread daemonThread = new Thread(lockMan);
daemonThread.setDaemon(true);
daemonThread.start();
LockData newLockData = new LockData(currentThread, lockKey, lockValue, daemonThread, lockMan);
THREAD_DATA.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
public void release() {
Thread currentThread = Thread.currentThread();
LockData lockData = THREAD_DATA.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock");
} else {
int lockCount = lockData.lockCount.decrementAndGet();
if (lockCount <= 0) {
if (lockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock");
} else {
//当 lockCount 等于0时,代表所有重入情况释放完毕,此时可以结束续费线程,并释放锁
try {
//停止续费线程
lockData.lockMan.stop();
lockData.daemonThread.interrupt();
//释放锁
redisCommands.eval("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
new String[]{lockKey}, lockValue);
System.out.println("---locked release---");
} finally {
THREAD_DATA.remove(currentThread);
}
}
} else {
System.out.println("---relocked release---");
}
}
}
private static class LockData {
final Thread owningThread;
final String lockKey;
final String lockValue;
final Thread daemonThread;
final LockMan lockMan;
final AtomicInteger lockCount;
private LockData(Thread owningThread, String lockKey, String lockValue, Thread daemonThread, LockMan lockMan) {
this.owningThread = owningThread;
this.lockKey = lockKey;
this.lockValue = lockValue;
this.daemonThread = daemonThread;
this.lockMan = lockMan;
this.lockCount = new AtomicInteger(1);
}
}
}
测试一下
public static void main(String[] args) {
String key = "lock";
//唯一标识,释放锁时确保是自己持有的锁
String value = UUID.randomUUID().toString();
int lockTime = 10;
MyLock myLock = new MyLock(commands, key, value);
try {
if (myLock.getLock(lockTime)) {
System.out.println("---locked---");
//模拟服务过程
if (myLock.getLock(lockTime)) {
System.out.println("---relocked---");
myLock.release();
}
sleep(25000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myLock.release();
}
}
输出结果
---locked---
---relocked---
---relocked release---
续费成功,将锁超时时间重置为 10s
续费成功,将锁超时时间重置为 10s
续费成功,将锁超时时间重置为 10s
续费成功,将锁超时时间重置为 10s
续费结束
---locked release---
源码链接
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 rockeycui@163.com
文章标题:Redis系列-Redis-分布式锁(JAVA-lettuce客户端)
文章字数:2.5k
本文作者:崔石磊(RockeyCui)
发布时间:2019-04-10, 20:00:00
原始链接:https://cuishilei.com/redis-lock.html版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。