当前位置: 首页 > news >正文

8、深入理解AQS之共享锁Semaphore

深入理解AQS之共享锁Semaphore

  • Semaphore源码分析
    • 构造函数
    • 公平锁与非公平锁
      • NonfairSync
      • FairSync
      • 总结
    • acquire()方法
    • release()方法

Semaphore源码分析

  Semaphore基于AQS+CAS实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore默认使用非公平锁。

Semaphore详情如下:

Semaphore详情

构造函数


// AQS的实现
private final Sync sync;// 默认使用非公平锁
public Semaphore(int permits) {sync = new NonfairSync(permits);
}// 根据fair布尔值选择使用公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平锁与非公平锁

  Semaphore中公平锁与非公平锁的实现,可以在tryAcquireShared()方法中找到两种锁的区别。

Semaphore公平锁与非公平锁

NonfairSync

  Semaphore#NonfairSync#tryAcquireShared()详情如下:


// 非公平锁  获取信号量
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}

  Semaphore#Sync#nonfairTryAcquireShared()详情如下:


// 非公平锁 获取信号量
final int nonfairTryAcquireShared(int acquires) {// 自旋for (;;) {// 获取Semaphore中可用的信号量数int available = getState();// 当前可用信号量数 - acquiresint remaining = available - acquires;// 可用信号量数不足 或 CAS操作获取信号量失败,返回  当前可用信号量数 - acquiresif (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

FairSync

  Semaphore#FairSync#tryAcquireShared()详情如下:


protected int tryAcquireShared(int acquires) {// 自旋for (;;) {// 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中)if (hasQueuedPredecessors())return -1;// 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

总结

不难看出,公平锁与非公平锁的区别在于当线程尝试获取Semaphore中的信号量时:

  • 公平锁,优先判断等待队列中是否有挂起的线程,如果有,则将当前线程添加到等待队列中,等待唤醒后抢夺信号量;
  • 非公平锁,不管等待队列中是否有挂起线程,优先尝试获取信号量,获取失败,将当前线程添加到等待队列。

acquire()方法

Semaphore默认实现的是非公平锁,acquire()按非公平锁的实现进行源码分析。

Semaphore中获取一个信号量,Semaphore#acquire()详情如下:


//  Semaphore 中无信号量,阻塞public void acquire() throws InterruptedException {// 获取 Semaphore 信号量sync.acquireSharedInterruptibly(1);}

AbstractQueuedSynchronizer#acquireSharedInterruptibly() 详情如下:


public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 线程中断,抛出异常if (Thread.interrupted())throw new InterruptedException();// 尝试获取Semaphore的信号量if (tryAcquireShared(arg) < 0)// 尝试获取信号量失败,再次获取Semaphore信号量doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 自旋for (;;) {final Node p = node.predecessor();// 当前节点的前驱节点为等待队列头节点if (p == head) {// 尝试获取信号量int r = tryAcquireShared(arg);// 获取信号量成功if (r >= 0) {// 唤醒等待队列中的待唤醒线程setHeadAndPropagate(node, r);p.next = null;failed = false;return;}}// 获取信号量失败,挂起线程  ==> 线程阻塞,待唤醒进行下一轮自旋if (shouldParkAfterFailedAcquire(p, node) &&// 若当前线程被中断,抛出InterruptedException异常parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

AbstractQueuedSynchronizer#setHeadAndPropagate()


// node: 当前节点;propagate 剩余资源
private void setHeadAndPropagate(Node node, int propagate) {// 获取等待队列中的头节点Node h = head;// 将当前Node节点设置为等待队列的头节点setHead(node);// 剩余资源大于0 || 原等待队列中的头节点为null || 原等待队列中 Node 的 ws 为 -1 或者 -3(共享锁)if (propagate > 0 || h == null || h.waitStatus < 0 ||  (h = head) == null || h.waitStatus < 0) {// 获取当前等待队列头节点的后继节点Node s = node.next;// 当前节点的后继节点为null  或 当前节点的后继节点为共享锁if (s == null || s.isShared())doReleaseShared();}
}

release()方法

Semaphore默认实现的是非公平锁,release()按非公平锁的实现进行源码分析。

归还Semaphore的信号量,Semaphore#release()详情如下:


// 归还Semaphore的信号量public void release() {sync.releaseShared(1);}

归还信号量,Semaphore#Sync#releaseShared() 详情如下:


public final boolean releaseShared(int arg) {// 尝试归还信号量if (tryReleaseShared(arg)) {// 归还信号量doReleaseShared();// 归还成功return true;}// 归还失败return false;
}

归还信号量,Semaphore#Sync#releaseShared()详情如下:


// 尝试归还信号量
protected final boolean tryReleaseShared(int releases) {// 自旋for (;;) {// 获取Semaphore中可用的信号量数int current = getState();// 当前可用信号量数 + 归还的信号量 releasesint next = current + releases;// 超出了int的最大值,变成了负数if (next < current)throw new Error("Maximum permit count exceeded");// cas操作,将信号量归还给Semaphoreif (compareAndSetState(current, next))return true;}
}

归还信号量成功,唤醒等待队列中的挂起线程,AbstractQueuedSynchronizer#doReleaseShared() :


private void doReleaseShared() {// 自旋for (;;) {// 获取等待队列头节点Node h = head;// 等待队列中有排队的线程if (h != null && h != tail) {int ws = h.waitStatus;// 等待队列头节点ws = -1,说明其后继节点中有待唤醒的线程if (ws == Node.SIGNAL) {// cas 操作,等待队列头节点的 ws 由 -1 更新为 0 ,cas失败,继续下一次自旋if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 唤醒头节点的后继节点中待唤醒线程unparkSuccessor(h);}// 解决共享锁JDK1.5的bug,头节点的 ws 为0,将头节点的 ws 设置为 -3 ,代表后继节点中可能有待唤醒的线程else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)break;}
}// 解决共享锁JDK1.5的bug,头节点的 ws 为0,将头节点的 ws 设置为 -3 ,代表后继节点中可能有待唤醒的线程else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)break;}
}

http://www.mrgr.cn/news/5113.html

相关文章:

  • C++基础知识五
  • uniapp分包echarts
  • MySQL 触发器(Trigger)
  • ES聚合,SQL查询
  • gitlab迁移至新的服务器
  • python 异常处理详解带(3分钟速通)
  • 大数据与大模型技术赋能:革新智能客服系统知识库管理的策略与实践
  • 【具体数学 Concrete Mathematics】1.1 递归问题 讲义
  • GitHub 官方 CLI 客户端发布 2.55.0
  • 软件测试-自动化测试
  • vue实现卡片遮罩层交互式功能
  • 倍内菲新品发布揭示宠物营养新纪元,引领行业保驾护航
  • 什么是天线OTA,怎么通过OTA数据评估产品射频环境情况
  • 004快速排序-python实现
  • FFmpeg的入门实践系列三(基础知识)
  • Docker微服务实战Demo
  • 自动化测试框架pytest+allure+requests
  • (待更)将windows11配置成生产力高的电脑:当计算机新生的你拿到新电脑时该配置哪些东西(python、mysql、git)
  • Linux基础I/O之文件缓冲区
  • 无人驾驶,并非无人之地