JAVA concurrency - CyclicBarrier and CountDownLatch source code detailed explanation

JAVA concurrency - CyclicBarrier and CountDownLatch source code detailed explanation

Overview

CountDownLatchAnd CyclicBarrierhave similarities, and often someone will come up to compare them, this time, I try to resolve these two classes separately from the source point of view, and from the perspective of the source code, look at the two different classes at .

CountDownLatch

CountDownLatchLiterally it is a counting tool class, in fact this class is a Java method for multi-thread counting.

CountDownLatchThe internal realization is mainly dependent on AQSthe sharing mode. When a thread CountDownLatchinitializes one count, other thread calls awaitwill be blocked until the other threads call the countDownmethod one by one release, and countthe value is reduced to 0, that is, the synchronization lock is released await.

Sync

The main internal implementation is a AQSsynchronizer inherited from it Sync. SyncThe source code is as follows:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        // count 
        Sync(int count) {
            // state count
            setState(count);
        }

        // count 
        int getCount() {
            return getState();
        }

        // 
        protected int tryAcquireShared(int acquires) {
            // state 0 
            // AQS AQS 
            // 0 0 
            // 0 
            return (getState() == 0) ? 1 : -1;
        }

        // 
        protected boolean tryReleaseShared(int releases) {
            // 
            for (;;) {
                // state
                int c = getState();
                // state 0 false
                if (c == 0)
                    return false;
                // state-1 
                int nextc = c-1;
                //CAS state 
                if (compareAndSetState(c, nextc))
                    // state 0
                    return nextc == 0;
            }
        }
    }
 

SyncIt is the inherited AQSsynchronizer. The following points are worth discussing in this code:

  1. Why use state to store the value of count?

    Because state and count are actually a concept. When state is 0, it means that the resource is free. When count is 0, it means that all CountDownLatchthreads have been completed. Although the two are not the same meaning, they are implemented in the code. The performance of the level is completely consistent, so count can be recorded in the state.

  2. Why tryAcquireShareddoes it not return 0?

    1. we need to explain the tryAcquireSharedpossible return values in AQS: a negative number means that shared locks cannot be acquired, and 0 means that shared locks can be acquired, but after the current thread has acquired all shared lock resources, the next thread will not There will be extra resources that can be acquired. A positive number means that you can acquire a shared lock, and there will be margin to provide shared locks to other threads. Then we come back and look at the CountDownLatchinternals tryAcquireShared. We don't pay attention to the subsequent threads at all in the implementation, and the subsequent resource occupancy conditions. I only want the current state, then the return value of 0 is actually unnecessary.

  3. Why tryReleaseSharedare the parameters in not used?

    According to the implementation of this class, tryReleaseSharedthe parameter we can know must be 1, because the completion of the thread must be completed one by one. In fact, when we look at the countDownmethod and call the method inside sync.releaseShared, we can find that he has written the parameter to 1, so the reason tryReleaseSharedthat the parameter is not used in fact is because the parameter value is fixed to 1.

Constructor and method

    // 
    public CountDownLatch(int count) {
        //count 0
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // Sync
        this.sync = new Sync(count);
    }


    // 
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 
    // 1
    public void countDown() {
        sync.releaseShared(1);
    }

    // count
    public long getCount() {
        return sync.getCount();
    }

    //toString
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
 

CyclicBarrier

CyclicBarrierLiterally, it is a loop fence. The role in JAVA is to let all threads complete and wait until all threads are completed before proceeding to the next operation.

CyclicBarrierIt does not directly inherit AQS to achieve synchronization, but uses reentrant locks ReentrantLockand Conditioncompletes its own internal logic.

Member variables

    // 
    private final ReentrantLock lock = new ReentrantLock();

    // 
    private final Condition trip = lock.newCondition();

    // 
    private final int parties;

    // Runnable 
    private final Runnable barrierCommand;

    // 
    private Generation generation = new Generation();

    // 
    private int count;

    private static class Generation {
        boolean broken = false;
    }
 

We can see that there is a very unfamiliar class Generationin the member variables . This is CyclicBarriera staticclass declared internally . Its role is to help distinguish the generation of threads, so that they CyclicBarriercan be reused. If this simple explanation does not make you well If you understand, you can look at the next source code analysis, and understand its purpose through implementation.

Constructor

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
 

Very conventional constructor, just initialize member variables simply, there is no special place.

Core method

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
 

awaitYes CyclicBarrier, the core method relies on this method to realize the unified planning of the thread, which calls the internal implementation doWait, let's take a look at the code:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 
        // ArrayBlockingQueue 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // Generation 
            final Generation g = generation;

            // generation broken broken 
            // 
            // tripped 
            if (g.broken)
                throw new BrokenBarrierException();

            // 
            //generation broken true 
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 1
            int index = --count;
            // 0 tripped 
            if (index == 0) {
                boolean ranAction = false;
                try {
                    // 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 
                    // nextGeneration 
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 
            for (;;) {
                try {
                    if (!timed)
                        // timed false trip 
                        trip.await();
                    else if (nanos > 0L)
                        // timed true awaitNanos 
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                // generation broken broken 
                if (g.broken)
                    throw new BrokenBarrierException();

                // g != generation generation
                // 
                // return
                if (g != generation)
                    return index;

                // 
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
 

After reading this core code, we come back to reflect on Generationthe meaning, we can already give a rough Generationreason for the use :

Different from CountDownLatchthe implementation, CyclicBarriera more complicated method is adopted. The reason is that the internal intervention and communication between multiple threads are involved. It CountDownLatchdoes not care about the implementation and process of the thread. It is just a counter, and CyclicBarrieryou need to know whether the thread ends normally. If it is interrupted, the cost will be higher if other methods are used. Therefore, CyclicBarrierthe author uses static internal classes to share the entire generational state among multiple threads to ensure that each thread can obtain the state of the fence and Be able to better feedback your own state. At the same time, this method is easy to reset and can CyclicBarrierbe reused efficiently. As for why brokenno volatilemodification is used , because all the methods of the class are locked internally, there will be no data out of synchronization.

summary

CountDownLatchAnd CyclicBarrierthere may be some similarities from the use of it, but after we read the source code we find that the two can be said to be bad days to do, to achieve the principles, implementation, application scenarios are not the same, conclude are the following point:

  1. CountDownLatchRealization depends directly on AQS; CyclicBarrierit is with the help of ReentrantLockandCondition
  2. CountDownLatchIt exists as a counter, so it adopts a smart design, the source code structure is clear and simple, and the same function is relatively simple; CyclicBarrierin order to achieve multi-threaded control, a more complex design is adopted, and the code implementation is also more winding Around.
  3. Due to CyclicBarrierthe implementation method adopted CountDownLatch, it CyclicBarriercan be reused many times compared to disposable
  4. Different counting methods: CountDownLatchuse cumulative counting, CyclicBarrierthen use reciprocal counting