Java High Concurrency Series: Some useful concurrency tools provided by Google

Java High Concurrency Series: Some useful concurrency tools provided by Google

Environment: jdk1.8.

Regarding concurrency, juc has provided us with a lot of useful tools, and Google has made extensions on this basis to make concurrent programming easier. These tools are placed in the guava.jar package.

This article demonstrates a few simple cases, see the effect of guava.

Some technologies that need to be understood first: thread pool in juc, Excecutors, ExecutorService, Callable, Future

guava maven configuration com.google.guava guava 27.0-jre Several commonly used classes in guava MoreExecutors: Provides some static methods, which are an extension of the Executors class in juc. Futures: It also provides a lot of static methods, which is an extension of Future in juc.

Case 1: Call back package com.itsoku.chat34 after asynchronous execution of the task;

import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;

/**

  • Follow Ali p7 to learn concurrency, WeChat public account: javacode2018 */@Slf4j public class Demo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {//Create a thread pool ExecutorService delegate = Executors.newFixedThreadPool(5); try {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);//Execute a task asynchronously ListenableFuture submit = executorService.submit(() -> {log.info("{}", System.currentTimeMillis());//sleep 2 Seconds, the default time-consuming TimeUnit.SECONDS.sleep(2); log.info("{}", System.currentTimeMillis()); return 10; });//Call back the corresponding method submit.addListener after the task is executed (() -> {log.info("The task is executed, I was called back"); }, MoreExecutors.directExecutor()); log.info("{}", submit.get());} finally {delegate.shutdown();}}} output:

14:25:50.055 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1-1567491950047 14:25:52.063 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1-1567491952063 14: 25:52.064 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1-The task is executed, I was called back 14:25:52.064 [main] INFO com.itsoku.chat34.Demo1-10 Description:

The ListeningExecutorService interface inherits from the ExecutorService interface in juc, and has made some extensions to the ExecutorService. Seeing that its name has Listening, it shows that this interface has its own monitoring function and can monitor the results of asynchronous execution of tasks. To create a ListeningExecutorService object through MoreExecutors.listeningDecorator, you need to pass an ExecutorService parameter, and the passed ExecutorService is responsible for asynchronous execution of tasks.

The submit method of ListeningExecutorService is used to execute a task asynchronously and return ListenableFuture. The ListenableFuture interface inherits from the Future interface in juc and extends the Future to have a monitoring function. Call submit.addListener to add a listener to the executed task, and the method in this listener will be called back when the task is executed.

The get method of ListenableFuture will block the current thread until the task is completed.

There is another way to write the above, as follows:

package com.itsoku.chat34;

import com.google.common.util.concurrent.*; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;

/**

  • Follow Ali p7 to learn concurrency, WeChat public account: javacode2018 */@Slf4j public class Demo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService delegate = Executors.newFixedThreadPool(5); try {ListeningExecutorService executorService = MoreExecutors .listeningDecorator(delegate); ListenableFuture submit = executorService.submit(() -> {log.info("{}", System.currentTimeMillis()); TimeUnit.SECONDS.sleep(4);//int i = 10/0; log.info("{}", System.currentTimeMillis()); return 10; }); Futures.addCallback(submit, new FutureCallback() {@Override public void onSuccess(@Nullable Integer result) {log.info ("Successful execution:{}", result);}

             @Override
             public void onFailure(Throwable t) {
                 try {
                     TimeUnit.MILLISECONDS.sleep(100);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
                 log.error(" :" + t.getMessage(), t);
             }
         }, MoreExecutors.directExecutor());
         log.info("{}", submit.get());
     } finally {
         delegate.shutdown();
     }
     

    }} Output:

14:26:07.938 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2-1567491967936 14:26:11.944 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2-1567491971944 14: 26:11.945 [main] INFO com.itsoku.chat34.Demo2-10 14:26:11.945 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2-Successful execution: 10 The above static method by calling Futures addCallback adds a callback to a task that is executed asynchronously. The object of the callback is a FutureCallback. This object has two methods. The task executes successfully and calls onSuccess, and the execution fails to call onFailure.

In the case of failure, you can remove the int i = 10/0; comment in the code and execute it to see the effect.

Example 2: Get the execution results of a batch of asynchronous tasks package com.itsoku.chat34;

import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent. MoreExecutors; import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors;

/**

  • Follow Ali p7 to learn concurrency, WeChat public account: javacode2018 */@Slf4j public class Demo3 {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("star"); ExecutorService delegate = Executors.newFixedThreadPool( 5); try {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate); List<ListenableFuture> futureList = new ArrayList<>(); for (int i = 5; i >= 0; i--) {int j = i; futureList.add(executorService.submit(() -> {TimeUnit.SECONDS.sleep(j); return j; }));}//Get the execution results of a batch of tasks List resultList = Futures.allAsList(futureList).get ();//output resultList.forEach(item -> {log.info("{}", item); });} finally {delegate.shutdown();}}} output:

14:26:35.970 [main] INFO com.itsoku.chat34.Demo3-star 14:26:41.137 [main] INFO com.itsoku.chat34.Demo3-5 14:26:41.138 [main] INFO com.itsoku.chat34 .Demo3-4 14:26:41.138 [main] INFO com.itsoku.chat34.Demo3-3 14:26:41.138 [main] INFO com.itsoku.chat34.Demo3-2 14:26:41.138 [main] INFO com .itsoku.chat34.Demo3-1 14:26:41.138 [main] INFO com.itsoku.chat34.Demo3-0 The results of 6 asynchronous tasks are output in order. The Futures.allAsList method is used here. Take a look. Declaration of this method:

public static ListenableFuture<List> allAsList( Iterable<? extends ListenableFuture<? extends V>> futures) Pass a batch of ListenableFuture, return a ListenableFuture<List>, internally convert a batch of results into a ListenableFuture object.

Example 3: After a batch of tasks are executed asynchronously, call back to execute a batch of tasks asynchronously, and finally calculate the sum

package com.itsoku.chat34;

import com.google.common.util.concurrent.*; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;

/**

  • Follow Ali p7 to learn concurrency, WeChat public account: javacode2018 */@Slf4j public class Demo4 {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("star"); ExecutorService delegate = Executors.newFixedThreadPool( 5); try {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate); List<ListenableFuture> futureList = new ArrayList<>(); for (int i = 5; i >= 0; i--) {int j = i; futureList.add(executorService.submit(() -> {TimeUnit.SECONDS.sleep(j); return j; }));} ListenableFuture<List> listListenableFuture = Futures.allAsList(futureList); Futures.addCallback(listListenableFuture, new FutureCallback<List>() {@Override public void onSuccess(@Nullable List result) {log.info("The sum of all results in the result:" + result.stream().reduce(Integer::sum).get() );}

             @Override
             public void onFailure(Throwable t) {
                 log.error(" :" + t.getMessage(), t);
             }
         }, MoreExecutors.directExecutor());
     } finally {
         delegate.shutdown();
     }
     

    }} Output:

14:47:04.819 [main] INFO com.itsoku.chat34.Demo4-star 14:47:09.933 [pool-1-thread-1] INFO com.itsoku.chat34.Demo4-the sum of all results in result: 15 codes A batch of tasks are executed asynchronously in the middle. After all tasks are completed, the onSuccess method above is called back, and all the results are summed internally.

To summarize some of the tool classes provided by guava to facilitate asynchronous execution of tasks and callbacks. There are also many useful tool classes inside guava. If you are interested, you can study it