Detailed explanation of the thread pool of JUC

Detailed explanation of the thread pool of JUC

Thread Pool

The work done by the thread pool is mainly to control the number of running threads. During processing, tasks are added to the queue , and then these tasks are started after the thread is created. If the number exceeds the maximum number, the excess number of threads will wait in line and wait for other threads to execute When finished, take the task out of the queue to execute it.

His main characteristics are: Thread reuse: control the maximum number of concurrency: manage threads.

  • First: reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing threads created by yourself.

  • Second: Improve the response speed. When the task arrives, the task can be executed immediately without waiting for the thread and stubbornness.

  • Third: Improve the manageability of threads. Threads are scarce resources. If you create a gold infinitely, it will not only consume resources, but also lower the stability of the system. Using thread pools can be used for unified allocation, tuning and monitoring.

Use of thread pool

The thread pool in Java is implemented through the Executor framework, which uses the Executor, Executors, ExecutorService, and ThreadPoolExecutor classes.

ExecutorThe sum ExecutorServiceis equivalent to Collection and List. We use ExecutorService to Executorsobtain three commonly used thread pools through tool classes.

Executors.newFixedThreadPool(int)

Create a fixed-length thread pool to control the maximum concurrent number of threads, and the excess threads will wait in the queue.

It can be seen that these 10 users are all executed by 5 threads (up to 5).

public class MyThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);//5 

        try {
            //10 
            for (int i = 1; i <= 10; i++) {
                //
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+"\t  ");
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //
            threadPool.shutdown();
        }
    }
}
 

View the underlying source code

The bottom layer is to return an ThreadPoolExecutorobject.

The thread pool corePoolSize and MaxmumPoolSize created by newFixedThreadPool are equal to the given initial value, and the LinkedBlockingQueue it uses is the Integer.MAX_VALUEnumber of queues that can be placed . The maximum load is the initial value + Integer.MAX_VALUE, which is equivalent to infinity.

Perform a long-term task, performance is much better

Executors.newSingleThreadExecutor()

Create a single-threaded thread pool, which will only use a unique worker thread to perform tasks, ensuring that all tasks are executed in the specified order

public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //ExecutorService threadPool = Executors.newFixedThreadPool(5);//5 
        ExecutorService threadPool = Executors.newSingleThreadExecutor();//1 

        try {
            //10 
            for (int i = 1; i <= 10; i++) {
                //
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+"\t  ");
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //
            threadPool.shutdown();
        }
    }
}
 

newSingleThreadExecutor sets corePoolSize and MaxmumPoolSize to 1, which uses LinkedBlockingQueue

Task scenario where one task is executed by one thread

Executors.newCachedThreadPool()

Create a cacheable thread pool. If the length of the thread pool exceeds the processing needs, idle threads can be flexibly recycled. If there is no recyclable, new threads are created.

public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //ExecutorService threadPool = Executors.newFixedThreadPool(5);//5 
        //ExecutorService threadPool = Executors.newSingleThreadExecutor();//1 
        ExecutorService threadPool = Executors.newCachedThreadPool();//N 

        try {
            //10 
            for (int i = 1; i <= 10; i++) {
                //
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+"\t  ");
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //
            threadPool.shutdown();
        }
    }
}
 

newCachedThreadPool sets corePoolSize to 0 and MaxmumPoolSize to Integer. MAX_VALUE , it uses SynchronousQUeue, that is to say, a thread is created to run when a task comes, and if the thread is idle for more than 60 seconds, the thread is destroyed.

Applicable: Execute many short-term asynchronous small programs or servers with lighter loads

Although Java provides ready-made thread pools that can be used, we don't use any of them in actual development. Instead, use ThreadPoolExecutora custom thread pool. Learned from Alibaba Development Manual

  1. [Mandatory] Thread resources must be provided through the thread pool, and it is not allowed to explicitly create threads in the application. Description: The advantage of the thread pool is to reduce the time spent on creating and destroying threads and the overhead of system resources to solve the problem of insufficient resources. If the thread pool is not used, it may cause the system to create a large number of threads of the same type and cause memory consumption or "over-switching" problems.

  2. [Mandatory] Thread pools are not allowed to be created using Executors, but are created through ThreadPoolExecutor. This way of processing allows students to write more clearly the operating rules of the thread pool and avoid the risk of resource exhaustion. Description: The disadvantages of the thread pool object returned by Executors are as follows:

    1. FixedThreadPool and SingleThreadPool:

    The allowed request queue length is Integer.MAX_VALUE, which may accumulate a large number of requests, resulting in OOM. 2) CachedThreadPool:

    The allowed number of threads to be created is Integer.MAX_VALUE, which may create a large number of threads, resulting in OOM.

    Before using ThreadPoolExecutor to customize the thread pool, we must first understand the seven parameters of the thread pool.

7.parameters of thread pool

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
 

corePoolSize

The number of resident core threads in the thread pool

1. After the thread pool is created, when a request task comes, the threads in the pool will be arranged to perform the requested task. Myopia is understood as today's on-duty thread

2.When the number of threads in the thread pool reaches corePoolSize, the arriving tasks will be put into the cache queue.

maximumPoolSize

The maximum number of threads that can be executed simultaneously in the thread pool, this value is greater than or equal to 1

keepAliveTime

Surplus idle thread survival time. When the space time reaches the keepAliveTime value, the excess threads will be destroyed until only corePoolSize threads are left.

By default: keepAliveTime will work only when the number of threads in the thread pool is greater than corePoolSize, until the number of threads in the thread is not greater than corepoolSIze,

unit

Unit of measure for keepAliveTime

workQueue

Work queue, tasks that have been submitted but not yet executed.

ArrayBlockingQueue

Bounded blocking queue based on array, sorted by FIFO. After new tasks come in, they will be placed at the end of the queue. A bounded array can prevent resource exhaustion. When the number of threads in the thread pool reaches corePoolSize, and a new task comes in, the task will be placed at the end of the queue, waiting to be scheduled. If the queue is already full, a new thread is created, and if the number of threads has reached maxPoolSize, the rejection strategy will be executed.

LinkedBlockingQuene

The unbounded blocking queue based on the linked list (in fact, the maximum capacity is Interger.MAX), sorted by FIFO. Due to the approximate unbounded nature of the queue, when the number of threads in the thread pool reaches corePoolSize, new tasks will always be stored in the queue instead of creating new threads until maxPoolSize. Therefore, when using the work queue, the parameter maxPoolSize In fact, it doesn't work.

SynchronousQuene

A blocking queue that does not cache tasks, the producer puts a task and must wait until the consumer takes out the task. That is to say, when a new task comes in, it will not be cached, but will be directly scheduled to execute the task. If there is no available thread, a new thread will be created. If the number of threads reaches maxPoolSize, the rejection strategy will be executed.

PriorityBlockingQueue

Unbounded blocking queue with priority, priority is achieved through the parameter Comparator.

threadFactory

Represents the thread factory that generates the worker threads in the thread pool, and the user creates a new thread, generally using the default

handler

The rejection strategy indicates how to reject when the thread queue is full and the worker threads are greater than or equal to the maximum display number of the thread pool (maxnumPoolSize).

Detailed

For example, a bank maximumPoolSizerepresents how many business windows the bank has, if there are five. Generally speaking, not all the windows of a bank are open to handle business. This is an open part, corePoolSizethat is, there are two open windows for handling business. Then the remaining 3 are not open, that is, no staff are present. When people need to come to the bank to handle business, they go to the open window to handle the business. At this time, the bank handles two at most. The remaining people can only continue to wait in the waiting area workQueue. There are 3 places in the waiting area, which means that there can only be 3 people at most. When someone else comes to handle the business later, the bank will have no place to sit because the waiting area is full and the business window is also full. At this time, the manager can only open the remaining 3 windows and let the off-duty bank employees come back and work overtime. . At this time, all 5 windows have been opened. Then the maximum number of people that the bank can receive at this time is maximumPoolSize+workQueue5+3 equals 8. At this time, if there are people coming, the bank has no place to sit (assuming people are not willing to stand). Then the manager has a corresponding handling strategy handlerto deal with this situation. At this time, the peak of bank processing has passed, and people are slowly leaving. Employees who work overtime have no business to deal with. When employees who are called to work overtime wait for the keepAliveTimeunit unittime, the manager sees that the peak has passed. Let them off work. threadFactoryIt represents the clothing of this bank and what style of logo is used.

1. After creating the thread pool, start waiting for requests.

2. When the execute() method is called to add a request task, the thread pool will make the following judgments:

2.1 If the number of running threads is less than corePoolSize , then immediately create a thread to run this task;

2.2 If the number of running threads is greater than or equal to corePoolSize , then put the task into the queue;

2.3 If the queue is full at this time and the number of running threads is still less than the maximumPoolSize , then you still have to create a non-core thread to run this task immediately;

2.4 If the queue is full and the number of running threads is greater than or equal to maximumPoolSize , then the thread pool will start the saturation rejection strategy to execute.

3. When a thread completes its task, it will remove the next task from the queue for execution.

4. When a thread has nothing to do for more than a certain time ( keepAliveTime ), the thread will judge:

If the number of currently running threads is greater than corePoolSize , then this thread is stopped.

So after all tasks of the thread pool are completed, it will eventually shrink to the size of corePoolSize .

4.rejection strategies

The waiting queue is full, and new tasks can no longer be filled. At the same time, the max thread in the thread pool has been reached, and it cannot continue to serve new tasks. This is the time we need to reject the policy mechanism to deal with this problem reasonably.

  • AbortPolicy (default): throw RejectedExecutionException directly to prevent the system from running normally

  • CallerRunsPolicy: "Caller Runs" an adjustment mechanism, this strategy will neither abandon tasks nor

    An exception will be thrown, but some tasks will be rolled back to the caller, thereby reducing the flow of new tasks.

  • DiscardOldestPolicy: Abandon the longest waiting task in the queue, and then try to put the rejected task into the queue.

  • DiscardPolicy: This policy silently discards tasks that cannot be processed, does not handle any tasks, or throws exceptions.

    If the task is allowed to be lost, this is the best strategy.

The above built-in rejection strategies all implement the RejectedExecutionHandle interface

Manually create thread pool

Through new ThreadPoolExecutorconfiguration parameters, manually create a thread pool.

The number of cores is 2 for corePoolSize, 5 for maximumPoolSize, and 3 for blocking queue size. Run to view the results~

At this time, the maximum processing number of this thread maximumPoolSize+workQueueis 5+3 equal to 8. At this time, do we reject the strategy AbortPolicy, when our thread count exceeds 8 to 9, it may cause an exception.

Throw java.util.concurrent.RejectedExecutionExceptionan exception.

We modify the rejection policy to CallerRunsPolicy

When the thread pool cannot be processed after the processing is exceeded, it will be returned to the caller for processing. Here we use the main thread call, so it is the main thread processing.

Modify the rejection policy as DiscardPolicy. Run to view the results.

The modified strategy is to DiscardOldestPolicydiscard the longest waiting task in the queue, and then try to put the rejected task into the queue.

Reasonable configuration of thread pool

Tasks can generally be divided into: CPU-intensive, IO-intensive, and hybrid. For different types of tasks, different sizes of thread pools need to be allocated.

CPU intensive means that the task requires a lot of computation without blocking, and the CPU runs at full speed all the time.

IO-intensive means that the task requires a lot of IO, that is, a lot of blocking.

By Runtime.getRuntime().availableProcessors()obtaining the number of CPU cores

  • Try to use a smaller thread pool for CPU-intensive tasks, generally CPU +1. Because CPU-intensive tasks make the CPU usage very high, if too many threads are used, only the number of context switches can be increased, which will bring additional overhead.
  • IO-intensive tasks can use a slightly larger thread pool, generally 2*CPU . The CPU usage of IO-intensive tasks is not high, so the CPU can be allowed to process other tasks while waiting for IO to make full use of CPU time.
  • Hybrid tasks can be divided into IO-intensive and CPU-intensive tasks, and then use different thread pools to process. As long as the execution time of the two tasks is not much different after the division, it will be more efficient than serial execution. Because if the execution time of the two tasks is very different after the division, then the task that is executed first has to wait for the task to be executed later, and the final time still depends on the task that is executed later, plus task splitting and merging The cost is more than the loss.