Java并发 - ReentrantLock
文章目录
- ReentrantLock
- AQS 队列
- AbstractOwnableSynchronizer
- AbstractQueuedSynchronizer
- Node
- waitStatus
- SHARED/EXCLUSIVE 模式
- 加锁流程
- 尝试加锁 tryAcquire
- 加锁失败入队
- addWaiter
- enq
- 阻塞等待 acquireQueued
- parkAndCheckInterrupt
- 放弃加锁 cancelAcquire
- 唤醒阻塞线程 unparkSuccessor
- 释放锁流程
- 中断处理
- 中断加锁模式 lockInterruptibly
- Condition
- ConditionObject
- await
- transferAfterCancelledWait
- reportInterruptAfterWait
- signal
- signalAll
- 对比Object#wait, notify/notifyAll
ReentrantLock
ReentrantLock 是 Java 中的一种可重入锁,属于 java.util.concurrent.locks 包。它提供了比传统的 synchronized 关键字更灵活的锁机制。以下是 ReentrantLock 的主要特点:
- 可重入性
同一线程可以多次获得同一把锁,而不会导致死锁。当线程第一次获取锁时,锁的持有计数增加;每次释放锁时,计数减少,直到计数为零时,锁才真正被释放。 - 公平性
ReentrantLock 可以选择公平性策略,默认非公平。公平锁按请求的顺序允许线程获取锁,而非公平锁则允许某些线程“插队”。可以通过构造方法指定锁的公平性:ReentrantLock lock = new ReentrantLock(true);
- 可中断的锁请求
ReentrantLock 提供了lockInterruptibly()
方法,允许线程在等待锁时响应中断。这使得线程在获取锁时可以被中断,增强了程序的灵活性。 - 条件变量支持
ReentrantLock 提供了 Condition 类的支持,允许线程在特定条件下等待和通知。通过 newCondition() 方法可以创建条件变量,支持更加复杂的线程协调。 - 锁的状态查询
可以通过isHeldByCurrentThread()
方法检查当前线程是否持有锁,通过getHoldCount()
方法获取当前线程持有锁的次数。 - 灵活的锁管理
ReentrantLock 允许显式地尝试获取锁,使用 tryLock() 方法可以尝试立即获取锁而不阻塞。如果获取失败,可以选择执行其他操作。 - 性能优势
在高竞争环境下,ReentrantLock 通常比 synchronized 更具性能优势,尤其是在高并发的场景中。 - 非阻塞锁请求
tryLock(long timeout, TimeUnit unit)
方法允许线程在指定的时间内尝试获取锁,如果无法在该时间内获取锁,则返回失败,避免了永久阻塞。 - 实现复杂的同步算法
由于其灵活的特性,ReentrantLock 可以用于实现一些复杂的同步算法和数据结构。
AQS 队列
ReentrantLock 基于AQS同步机制实现加锁的机制
AbstractOwnableSynchronizer
java.util.concurrent.locks.AbstractOwnableSynchronizer
是JUC里提供的一个抽象类,维护了当前获取锁的线程
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;
}
一般使用的是其子类 AbstractQueuedSynchronizer,这个才是 AQS 本身
AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedSynchronizer 基于模板方法设计模式,
public abstract class AbstractQueuedSynchronizer/*** Head of the wait queue, lazily initialized. Except for* initialization, it is modified only via method setHead. Note:* If head exists, its waitStatus is guaranteed not to be* CANCELLED.*/private transient volatile Node head;/*** Tail of the wait queue, lazily initialized. Modified only via* method enq to add new wait node.*/private transient volatile Node tail;/*** The synchronization state.*/private volatile int state;
}
Node
Node是AbstractQueuedSynchronizer的一个内部类,表示AQS队列的节点对象
static final class Node {volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;
}
waitStatus
class Node {/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/* waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
}
状态字段,仅采用以下值:
- SIGNAL:此节点的后继节点已(或即将)被阻止(通过停放),因此当前节点必须在释放或取消后继节点时取消停放。为避免竞争,获取方法必须首先指示它们需要信号,然后重试原子获取,然后在失败时阻止。
- CANCELLED:此节点由于超时或中断而被取消。节点永远不会离开此状态。特别是,具有已取消节点的线程永远不会再被阻止。
- CONDITION:此节点当前位于条件队列中。在传输之前,它将不会用作同步队列节点,此时状态将设置为 0。(此处使用此值与字段的其他用途无关,但简化了机制。)
- PROPAGATE:releaseShared 应传播到其他节点。这在 doReleaseShared 中设置(仅适用于头节点),以确保传播继续,即使其他操作已经介入。
- 0:以上都不是
为简化使用,这些值按数字排列。非负值表示节点不需要发信号。因此,大多数代码不需要检查特定值,只需检查与0的大小关系。对于普通同步节点,该字段初始化为 0,对于条件节点,该字段初始化为 CONDITION。使用 CAS(或在可能的情况下,使用无条件volatile写入)对其进行修改。
SHARED/EXCLUSIVE 模式
static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;
}
加锁流程
非公平方式加锁流程如下
final void lock() {// 先尝试CAS加锁if (compareAndSetState(0, 1))// 成功则setExclusiveOwnerThread(Thread.currentThread());else acquire(1);
}
公平方式加锁流程如下
final void lock() {acquire(1);
}
都是调用 acquire(1)
,这是AbstractQueuedSynchronizer的方法
//
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
尝试加锁 tryAcquire
公平和非公平的区别在于 tryAcquire 的各自的实现。tryAcquire 方法在AbstractQueuedSynchronizer
中默认实现是直接抛异常
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
在NonfairSync
和FairSync中有各自的实现实现
// NonfairSync
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}// Sync
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 锁未被任何线程持有,直接尝试加锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) { // 锁重入int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}// 加锁失败return false;
}// FairSync
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 没有等待的线程才尝试加锁if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) { // 锁重入int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
重入次数限制
重入次数最大支持2147483647,在文档中有提到
加锁失败入队
加锁失败后,封装为一个Node进行等待队列
addWaiter
private Node addWaiter(Node mode) {// mode: Node.EXCLUSIVENode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {// 说明有线程在排队node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
如果有多个线程同时执行 addWaiter,那么多个线程可能node.prev = pred;
都执行成功,如下图所示,此时pred.next = node;
就需要进行并发控制了,这里采用的是 CAS。
enq
如果当前队列为空,没有线程排队或者前面并发情况下CAS失败的线程
private Node enq(final Node node) {for (;;) {// 获取 tail 节点Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {// 有线程排队node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}// 如果有多个线程,失败的线程继续下一次循环,直到设置成功}}
}
阻塞等待 acquireQueued
处在等待队列中的线程只有在队头时有机会尝试加锁,返回值表示在阻塞过程中是否中断过
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// p == head 表明该节点在队首,再次尝试获取锁if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 获取锁失败的线程: 阻塞该线程,等待被唤醒获取锁资源if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
获取失败后是否应该阻塞,只要是Node.SIGNAL的节点都需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前驱节点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) { // CANCELLED// node.prev 指向队列中向前第一个不为 CANCELLED 的节点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { // 0 | CONDITION -2 | PROPAGATE -3// 满足条件: <= 0 & != -1compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
由前驱节点负责唤醒后一个节点
parkAndCheckInterrupt
中断会唤醒阻塞的线程
private final boolean parkAndCheckInterrupt() {// 阻塞当前线程LockSupport.park(this);// 返回线程是否被中断过return Thread.interrupted();
}
放弃加锁 cancelAcquire
cancelAcquire意为取消获取锁,什么时候取消获取,如下图所示,由变量 fasle 控制,但是如果加锁成功,failed 肯定为 false,所以不会执行cancelAcquire,而如果不成功那么会阻塞,或者是再次循环,所以也不会执行cancelAcquire。所以想要执行cancelAcquire要满足的条件只有1个,就是for循环里面操作抛了异常
下面来看取消获取锁的逻辑
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// 跳过取消的前驱节点Node pred = node.prev;while (pred.waitStatus > 0) // CANCELLEDnode.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}
}
唤醒阻塞线程 unparkSuccessor
唤醒指定节点的后继节点
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0) // 非 CANCELLED 状态compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) { // CANCELLEDs = null;// 从后往前找到node的第一个waitStatus <= 0的后继节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 解除阻塞状态
}
释放锁流程
如果当前线程持有锁,则会减少该线程的持有计数。每当线程成功获取锁时,持有计数会增加;释放锁时,计数会相应减少。
释放锁的条件
- 持有计数为零:当持有计数减小到零时,表示当前线程完全释放了锁,此时会进行锁的状态更新。
- 唤醒等待线程:如果有其他线程在等待获取该锁,ReentrantLock 会唤醒等待队列中的一个或多个线程,以便它们可以尝试获取锁。
public void unlock() {sync.release(1);
}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;// 存在等待队列if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}protected final boolean tryRelease(int releases) {int c = getState() - releases;// 检查当前线程(释放锁的线程)是否持有锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
因此非公平锁和公平锁释放锁的流程是一样的
线程可以反复加锁,但也需要释放同样加锁次数的锁,即重入了多少次,就要释放多少次,不然也会导致锁不被释放。什么情况下会重入呢?比如下面这个:
public void m() {for (int i = 0; i < 10; i++) {lock.lock();}try {// ... method body} finally {lock.unlock(); // 只释放一次,不会导致锁释放}
}
中断处理
因为 acquireQueued 方法如果返回,那必然是获取锁成功,但是从入队到获取锁成功这段时间内线程是可能被中断的,中断需要由调用方进行处理。
acquireQueued 在获取锁失败进行阻塞时,会清除中断标志位
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); // 调用 currentThread().isInterrupted(true)
}
所以框架在此时中断当前线程,重置中断标志位,以让线程自己处理中断
static void selfInterrupt() {Thread.currentThread().interrupt();
}
调用interrupt()
方法会将当前线程的中断状态设置为 true。这表示该线程希望被中断。该状态可以在其他地方通过Thread.interrupted()
或isInterrupted()
方法检查。
如果当前线程正在执行某些阻塞操作,如Object.wait()
、Thread.sleep()
或 BlockingQueue.take()
等,调用interrupt()
会导致这些操作抛出 InterruptedException。
这使得线程能够在等待或睡眠状态中被唤醒,以便做出适当的响应。
为什么要调用
currentThread().isInterrupted(true)
清除中断标志后然后又通过Thread.currentThread().interrupt()
重置中断状态?
中断加锁模式 lockInterruptibly
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);
}private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果有中断过,直接抛出中断异常throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
Condition
下面这个示例使用Condition:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;class BoundedBuffer {private final int[] buffer;private int count, putIndex, takeIndex;private final ReentrantLock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();public BoundedBuffer(int size) {buffer = new int[size];}public void put(int value) throws InterruptedException {lock.lock();try {while (count == buffer.length) {notFull.await(); // 等待缓冲区不满}buffer[putIndex] = value;if (++putIndex == buffer.length) putIndex = 0; // 循环使用count++;notEmpty.signal(); // 唤醒等待取数据的线程} finally {lock.unlock();}}public int take() throws InterruptedException {lock.lock();try {while (count == 0) {notEmpty.await(); // 等待缓冲区不空}int value = buffer[takeIndex];if (++takeIndex == buffer.length) takeIndex = 0; // 循环使用count--;notFull.signal(); // 唤醒等待放数据的线程return value;} finally {lock.unlock();}}
}
ConditionObject
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
内部维护了一个单向链表
public class ConditionObject implements Condition {/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;
}
await
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 如果在同步队列中,则进行阻塞LockSupport.park(this);// checkInterruptWhileWaiting 调用的是// Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
addConditionWaiter 方法用于将等待的节点加入链表尾部
private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();// 尾节点t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)// 没有等待的线程firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
删除所有取消的节点,其实就是链表操作,删除t.waitStatus != Node.CONDITION
的节点
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;// 头节点if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;} elsetrail = t;// 下一个节点t = next;}
}
调用 AQS 释放锁
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)// 必然释放锁失败,也就是抛出 IllegalMonitorStateException 异常node.waitStatus = Node.CANCELLED;}
}
判断是否在AQS同步队列中
final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;if (node.next != null) // If has successor, it must be on queuereturn true;/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it. It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/return findNodeFromTail(node);
}// 从尾节点开始找
private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}/** If we lost out to a signal(), then we can't proceed* until it finishes its enq(). Cancelling during an* incomplete transfer is both rare and transient, so just* spin.*/while (!isOnSyncQueue(node))Thread.yield();return false;
}
reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();
}
signal
该方法用于唤醒一个在该条件上等待的线程。如果有多个线程在等待,signal 方法只会唤醒其中一个线程。
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
将等待状态由Node.CONDITION
改为0,然后进入AQS同步队列
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;// CANCELLEDif (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
signalAll
该方法用于唤醒所有在该条件上等待的线程。对每个Node都调用transferForSignal
方法
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);
}private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}
对比Object#wait, notify/notifyAll
Condition 和 Object#wait
都用于实现线程间的协调与通信,但它们在设计、使用方式和功能上有一些重要的区别。以下是它们之间的比较:
-
基本概念
Object#wait
是 Java 的内置方法,任何对象都可以调用。线程在调用 wait() 方法时会释放该对象的监视器锁,并进入等待状态,直到被其他线程调用 notify() 或 notifyAll() 唤醒。
Condition是 java.util.concurrent.locks 包中的一个接口,通常与 ReentrantLock 一起使用。提供了更灵活的线程间通信机制,可以在条件不满足时阻塞线程,并在条件满足时唤醒。 -
使用方式
Object#wait
需要配合synchronized
使用,线程必须持有对象的监视器锁才能调用 wait(),否则会抛出IllegalMonitorStateException。唤醒等待的线程时,必须在同一个对象的锁上调用 notify() 或 notifyAll()。而Condition需要与 ReentrantLock 结合使用,调用 lock() 方法获取锁后,才能调用 await()。唤醒等待的线程时,可以使用 signal() 或 signalAll(),不需要在同一个对象上。
synchronized (obj) {obj.wait(); // 进入等待状态// 处理逻辑
}lock.lock();
try {condition.await(); // 进入等待状态// 处理逻辑
} finally {lock.unlock();
}
- 灵活性:Object#wait只能在对象的监视器锁上使用,缺乏灵活性。Condition支持多个条件变量,允许在同一个锁下定义多个不同的条件。更适合复杂线程协调需求
- 性能:
Object#wai
t在高并发场景下,性能可能较低,因为synchronized
会导致线程竞争和上下文切换。Condition通常在高并发环境中性能更优,因为它支持更细粒度的控制,可以避免不必要的上下文切换。 - 可读性和维护性
Object#wait代码可读性较低,容易导致死锁和复杂的错误。Condition提供更好的可读性和维护性,可以更清晰地表达线程之间的关系和条件。