# Semaphore
# 1. 概念
Semaphore 信号量可以用来控制同一时刻访问临界资源(共享资源)的线程数,以确保访问临界资源的线程能够正确、合理的使用公共资源。
在初始化Semaphore时,需要为这个许可传入一个数值,该数值表示同一时刻可以访问临界资源的最大线程数,也被称为许可集。
# 2. 功能实现
Semaphore是通过acquire方法获取许可证,如果获取到了,那么直接返回,否则进入阻塞状态;通过release方法释放许可证,释放的时候如果有现成因为调用acquire处于阻塞状态,将会唤醒一个线程。
# 3. 实现源码分析
# 3.1 Semaphore创建
/**
* 创建一个非公平的许可集
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 根据传入的fair值来创建公平的许可集或非公平的许可集
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore跟ReentrantLock一样,提供了公平与非公平许可证的实现,Semaphone多了一个参数permits,表示创建多少个许可证,同一个时间允许多少个线程访问。
# 3.2 Semaphore 非公平许可证的实现
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
非公平实现类NonfairSync,初始化许可证(state)的值,初始化完成后,state代表着当前信号量对象的可用许可数。
/**
* 请求一个许可证,
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
分析:请求一个许可证,如果当前Semaphore中存在足够的许可证,那么该方法会立即返回,否则进入阻塞,等待其他线程释放。
/**
* 使用共享模式获取锁,该方法可以被中断
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断线程是否中断,是则抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享许可证,
if (tryAcquireShared(arg) < 0)
//将当前线程加入同步队列阻塞
doAcquireSharedInterruptibly(arg);
}
该方法是AQS的内部方法,在获取同步标识时是可以响应中断操作,如果操作被中断,则抛出中断异常,否则调用tryAcquireShared方法,尝试获取一个许可数,获取成功则返回执行业务,方法结束,如果获取失败,则doAcquireSharedInterruptibly该方法,将当前线程加入同步队列阻塞。
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
/**
* 非公平的获取许可资源
*/
final int nonfairTryAcquireShared(int acquires) {
// 自旋死循环
for (;;) {
// 同步状态值(许可证数)
int available = getState();
// 许可证数减1
int remaining = available - acquires;
// 判断信号量中的可用许可数是否已小于0或者cas执行是否成功,如果设置成功,则返回当前的许可数值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
nonfairTryAcquireShared方法中首先获取到state的值,在减去1,获得remaining,如果remaining小于0,则直接返回remaining,如果大于或等于0,则执行compareAndSetState,设置状态值,cas成功则表示获取锁成功,返回remaining的值,如果获取失败,则再次循环。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建一个共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
if (p == head) {
// 如果当前节点的前驱节点为头节点,则尝试获取锁资源的许可证数
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果许可证数大于等于0,则设置当前节点为头节点,并唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 将AQS的尾节点赋值给pred值,
Node pred = tail;
// 如果pred不为null,则表示该同步队列中存在节点
if (pred != null) {
// 将当前节点的前驱节点赋值为目前的尾节点上的节点
node.prev = pred;
// 使用cas,将当前节点替换尾节点上的节点
if (compareAndSetTail(pred, node)) {
// 原先尾节点上的节点的后驱节点指向当前节点。
pred.next = node;
return node;
}
}
// 将当前节点插入队列
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果当前队列为空,
if (t == null) { // Must initialize
// 则初始化,创建一个新的节点使用cas赋值在头节点上
if (compareAndSetHead(new Node()))
// 如果成功,则将尾节点也设置为初始化节点
tail = head;
} else {
// 将当前节点的前驱节点设置为尾节点上的节点
node.prev = t;
// 使用cas将当前节点和尾节点上的节点进行替换
if (compareAndSetTail(t, node)) {
// 成功则将尾节点上的节点的后驱节点设置为当前节点
t.next = node;
return t;
}
}
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置节点为头节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果propagate许可证数大于0或者头节点等于null或者头节点的状态小于0(signal可唤醒,PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 判断后继节点为null或者后继节点是共享模式
if (s == null || s.isShared())
// 释放共享锁,唤醒后继节点
doReleaseShared();
}
}
tryAcquireShared方法获取锁资源成功或remaining小于0,则返回,如果大于或等于0,则获取锁资源成功,直接返回,如果小于0,则执行doAcquireSharedInterruptibly方法,将当前线程加入到队列中。
addWaiter:将当前节点封装为一个Node节点,并加入队列,如果队列中为null,则先创建一个节点,并赋值给头节点及尾节点,再将当前节点连接在新建的节点之后,并将当前节点指向尾节点;如果队列不为null,则将当前节点插入到队列的末尾,并将当前节点指向尾节点。
# 3.3 释放共享锁
/**
* 释放一次许可证
*/
public void release() {
sync.releaseShared(1);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 当前的许可证数+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 使用cas将修改后的许可证数赋值给state
if (compareAndSetState(current, next))
return true;
}
}
/**
* 共享锁的释放动作
*/
private void doReleaseShared() {
/*
* 唤醒后继
*/
for (;;) {
Node h = head;
// 判断头节点
if (h != null && h != tail) {
// 如果头节点不等于null,且头节点不等于尾节点(当只有一个节点时,头节点等于尾节点)
int ws = h.waitStatus;
// 如果该节点的状态为待唤醒
if (ws == Node.SIGNAL) {
// 将该节点的等待状态设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 不成功则继续
continue; // loop to recheck cases
// 成功则唤醒头节点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 如果等待状态等于0,且设置为共享状态不成功,则继续循环
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}