深入java并发包源码(二)AQS的介绍与使用
in Java with 0 comment

深入java并发包源码(二)AQS的介绍与使用

in Java with 0 comment

深入java并发包源码(一)简介
深入java并发包源码(二)AQS的介绍与使用
深入java并发包源码(三)AQS独占方法源码分析

AQS

本文章会讲解 AQS 的使用方法,然后通过 DEBUG 跟踪 AQS 执行的一系列操作来分析源码,读者跟着文章 DEBUG 跟踪源码能更容易理解。

AQS 是什么?

AbstractQueuedSynchronizer 队列同步器(AQS)是一个抽象类,作为并发工具的基础组件,为真正的实现类提供基础设施。并发工具是面向使用者的,AQS 面向的是并发工具的实现者。

AQS 的使用

AQS 有什么用?

AQS 提供了如构建同步队列,控制同步状态等方法,从设计模式角度来看,它采用了模板模式。它的主要使用方式是继承这个抽象类然后重写部分方法来实现自定义同步工具。

AQS_uses

我们可以看到上面这些锁都是通过 AQS 实现的。

拿 ReentrantLock 来举例,它有一个内部类 Sync 继承了 AQS,并且重写了一些方法。然后将内部类的方法导出来给使用者使用。

讲的再多也不如自己动手实现一个并发工具理解的深刻,我们先介绍一下 AQS 的 API 然后用这些 API 来实现一个自定义的锁来理解它的使用方法。

AQS 分为可以重写的方法和不可以重写的方法,需要根据自己的需求去实现方法。

可以重写的方法:

方法名功能
tryAcquire(int arg)排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用 compareAndSetState 来做)状态。
tryRelease(int arg)释放状态
tryAcquireShared(int arg)共享的模式下获取状态。
tryReleaseShared(int arg)共享的模式下释放状态。
isHeldExclusively()在排它模式下,状态是否被占用。

上面的方法只需要按照需求实现即可,比如 Reentrantlock 是独占的,就只需要实现 tryAcquiretryRelease 即可。当你实现 Semaphore 的时候就只需要实现 tryAcquireSharetryReleaseShared 方法即可。

不可重写的方法

这些方法被声明成 final 的,也就是不可重写的。后面对于源码的分析主要就是分析这些方法的源码。

方法名功能
acquire(int arg)独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回
acquireInterruptibly(int arg)可中断的获取同步状态,如果当前线程被中断,会抛出异常并返回
tryAcquireNanos(int arg)可中断的获取同步状态,如果超时会返回 false
acquireShared(int arg)共享的模式下获取同步状态
acquireSharedInterruptibly()在排它模式下,状态是否被占用。
tryAcquireShareNanos()共享式的 tryAcquireNanos()
release()释放同步状态
releaseShared()共享式释放同步
getQueueThreads()获取等待在同步队列上的线程集合

现在我们需要实现一个独占锁,使用 AQS 提供的方法来重写 AQS 的模板方法:tryAcquiretryRelease

以下 demo 来自《Java 并发编程的艺术》

Sync .class

public class Sync extends AbstractQueuedSynchronizer {
    // 检测是否有线程持有锁只需看 state 是不是等于 1,state == 1 时
    // 锁被其他线程使用,当等于 0 时未被其他线程得到
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    // 尝试着获取锁,当 status 为 0 时获取成功返回 true,否则返回 false 
     @Override
    protected boolean tryAcquire(int arg) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试着释放锁
    @Override
    protected boolean tryRelease(int arg) {
        // 没有线程获取锁却释放
        if (getState() == 0) throw new IllegalArgumentException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public Condition newCondition() {
        return new ConditionObject();
    }

}

Mutex.class 用于导出 Sync 的方法

public class Mutex implements Lock {
    private Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {
        sync.release(0);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    
}