一、背景
日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
大家可以借助每个 key 对应一个 reentrantlock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 semaphore ,让同一个 key 的线程使用 semaphore 控制同时执行的线程数。
二、参考代码
接口定义
public interface lockbykey<t> { /** * 加锁 */ void lock(t key); /** * 解锁 */ void unlock(t key); }
2.1 同一个 key 只能一个线程执行
2.1.1 代码实现
每个 key 对应一个 reentrantlock ,让同一个 key 的线程使用该 lock 加锁。
import java.util.map; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.locks.reentrantlock; public class defaultlockbykeyimpl<t> implements lockbykey<t> { private final map<t, reentrantlock> lockmap = new concurrenthashmap<>(); /** * 加锁 */ @override public void lock(t key) { // 如果key为空,直接返回 if (key == null) { throw new illegalargumentexception("key 不能为空"); } // 获取或创建一个reentrantlock对象 reentrantlock lock = lockmap.computeifabsent(key, k -> new reentrantlock()); // 获取锁 lock.lock(); } /** * 解锁 */ @override public void unlock(t key) { // 如果key为空,直接返回 if (key == null) { throw new illegalargumentexception("key 不能为空"); } // 从map中获取锁对象 reentrantlock lock = lockmap.get(key); // 获取不到报错 if (lock == null) { throw new illegalargumentexception("key " + key + "尚未加锁"); } // 其他线程非法持有不允许释放 if (!lock.isheldbycurrentthread()) { throw new illegalstateexception("当前线程尚未持有,key:" + key + "的锁,不允许释放"); } lock.unlock(); } }
注意事项:
(1)参数合法性校验
(2)解锁时需要判断该锁是否为当前线程持有
2.1.2 编写单测
import com.google.common.collect.lists; import org.junit.test; import java.util.hashset; import java.util.list; import java.util.set; import java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.timeunit; public class defaultlockbykeyimpltest { private final lockbykey<string> lockbykey = new defaultlockbykeyimpl<>(); private final countdownlatch countdownlatch = new countdownlatch(7); private final executorservice executorservice = executors.newfixedthreadpool(10); @test public void test() throws interruptedexception { list<string> keys = lists.newarraylist("a", "a", "a", "b", "c", "b", "d"); set<string> executingkeyset = new hashset<>(); for (int i = 0; i < keys.size(); i++) { string key = keys.get(i); int finali = i; executorservice.submit(() -> { lockbykey.lock(key); if (executingkeyset.contains(key)) { throw new runtimeexception("存在正在执行的 key:" + key); } executingkeyset.add(key); try { system.out.println("index:" + finali + "对 [" + key + "] 加锁 ->" + thread.currentthread().getname()); timeunit.seconds.sleep(1); } catch (interruptedexception e) { throw new runtimeexception(e); } finally { system.out.println("index:" + finali + "释放 [" + key + "] ->" + thread.currentthread().getname()); lockbykey.unlock(key); executingkeyset.remove(key); countdownlatch.countdown(); } }); } countdownlatch.await(); } }
如果同一个 key 没释放能够再次进入,会抛出异常。
也可以通过日志来观察执行情况:
index:0对 [a] 加锁 ->pool-1-thread-1 index:6对 [d] 加锁 ->pool-1-thread-7 index:4对 [c] 加锁 ->pool-1-thread-5 index:3对 [b] 加锁 ->pool-1-thread-4 index:6释放 [d] ->pool-1-thread-7 index:4释放 [c] ->pool-1-thread-5 index:0释放 [a] ->pool-1-thread-1 index:3释放 [b] ->pool-1-thread-4 index:1对 [a] 加锁 ->pool-1-thread-2 index:5对 [b] 加锁 ->pool-1-thread-6 index:1释放 [a] ->pool-1-thread-2 index:5释放 [b] ->pool-1-thread-6 index:2对 [a] 加锁 ->pool-1-thread-3 index:2释放 [a] ->pool-1-thread-3
2.2、同一个 key 可以有 n个线程执行
2.2.1 代码实现
每个 key 对应一个 semaphore ,让同一个 key 的线程使用 semaphore 控制同时执行的线程数。
import lombok.sneakythrows; import java.util.map; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.semaphore; public class simultaneousentrieslockbykey<t> implements lockbykey<t> { private final map<t, semaphore> semaphores = new concurrenthashmap<>(); /** * 最大线程 */ private int allowed_threads; public simultaneousentrieslockbykey(int allowed_threads) { this.allowed_threads = allowed_threads; } /** * 加锁 */ @override public void lock(t key) { semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new semaphore(allowed_threads) : v); semaphore.acquireuninterruptibly(); } /** * 解锁 */ @override public void unlock(t key) { // 如果key为空,直接返回 if (key == null) { throw new illegalargumentexception("key 不能为空"); } // 从map中获取锁对象 semaphore semaphore = semaphores.get(key); if (semaphore == null) { throw new illegalargumentexception("key " + key + "尚未加锁"); } semaphore.release(); if (semaphore.availablepermits() >= allowed_threads) { semaphores.remove(key, semaphore); } }
2.2.2 测试代码
import com.google.common.collect.lists; import org.junit.test; import java.time.localdatetime; import java.util.collections; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.timeunit; public class simultaneousentrieslockbykeytest { private final int maxthreadeachkey = 2; private final lockbykey<string> lockbykey = new simultaneousentrieslockbykey<>(maxthreadeachkey); private final countdownlatch countdownlatch = new countdownlatch(7); private final executorservice executorservice = executors.newfixedthreadpool(10); @test public void test() throws interruptedexception { list<string> keys = lists.newarraylist("a", "a", "a", "b", "c", "b", "d"); map<string, integer> executingkeycount = collections.synchronizedmap(new hashmap<>()); for (int i = 0; i < keys.size(); i++) { string key = keys.get(i); int finali = i; executorservice.submit(() -> { lockbykey.lock(key); executingkeycount.compute(key, (k, v) -> { if (v != null && v + 1 > maxthreadeachkey) { throw new runtimeexception("超过限制了"); } return v == null ? 1 : v + 1; }); try { system.out.println("time:" + localdatetime.now().tostring() + " ,index:" + finali + "对 [" + key + "] 加锁 ->" + thread.currentthread().getname() + "count:" + executingkeycount.get(key)); timeunit.seconds.sleep(1); } catch (interruptedexception e) { throw new runtimeexception(e); } finally { system.out.println("time:" + localdatetime.now().tostring() + " ,index:" + finali + "释放 [" + key + "] ->" + thread.currentthread().getname() + "count:" + (executingkeycount.get(key) - 1)); lockbykey.unlock(key); executingkeycount.compute(key, (k, v) -> v - 1); countdownlatch.countdown(); } }); } countdownlatch.await(); } }
输出:
time:2023-03-15t20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
time:2023-03-15t20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
time:2023-03-15t20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
time:2023-03-15t20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
time:2023-03-15t20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
time:2023-03-15t20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
time:2023-03-15t20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
time:2023-03-15t20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
time:2023-03-15t20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
time:2023-03-15t20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
time:2023-03-15t20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
time:2023-03-15t20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
time:2023-03-15t20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
time:2023-03-15t20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0
三、总结
本文结合自己的理解和一些参考代码,给出自己的示例,希望对大家有帮助。
到此这篇关于java 根据某个 key 加锁的实现方式的文章就介绍到这了,更多相关java根据某个 key 加锁内容请搜索七九推以前的文章或继续浏览下面的相关文章希望大家以后多多支持七九推!
发表评论