Thursday, May 7, 2020

Springboot @Async (taskExecutor) , CompletableFuture etc



@Async without return 
@Override
@Async
public void myPublicAsyncMethod(){
 //SimpleAsyncTaskExecutor
 System.out.println("Currently Executing thread name - " + Thread.currentThread().getName());
 System.out.println("User created with default executor");
}
By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations

@Async with Return value

@Override
@Async
public Future<Map<String, String>> createAndReturnUser() {                                                                                 
.....
    return new AsyncResult(hashMapObj);
}

Defining ThreadPoolTaskExecutor and ConcurrentTaskExecutor at Method Level

ThreadPoolTaskExecutor This implementation is the most commonly used one. It exposes bean properties for configuring a java.util.concurrent.ThreadPoolExecutor and wraps it in a TaskExecutor. If you need to adapt to a different kind of java.util.concurrent.Executor, it is recommended that you use a ConcurrentTaskExecutor instead.

By default spring uses SimpleAsyncTaskExecutor to run methods annotated with @Async. We can also define our custom executor bean as follow and use it at method level.


@Bean(name = "threadPoolExecutor")
public Executor getAsyncExecutor() {
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 executor.setCorePoolSize(7);
 executor.setMaxPoolSize(42);
 executor.setQueueCapacity(11);
 executor.setThreadNamePrefix("threadPoolExecutor-");
 executor.initialize();
 return executor;
}
Or
@Bean(name = "ConcurrentTaskExecutor")
public TaskExecutor taskExecutor2 () {
 return new ConcurrentTaskExecutor(
   Executors.newFixedThreadPool(3));
}
These beans can be used at method level in following ways


@Override
@Async("threadPoolExecutor")
public void createUserWithThreadPoolExecutor(){
 System.out.println("Currently Executing thread name - " + Thread.currentThread().getName());
 System.out.println("User created with thread pool executor");
}

@Override
@Async("ConcurrentTaskExecutor")
public void createUserWithConcurrentExecutor(){
 System.out.println("Currently Executing thread name - " + Thread.currentThread().getName());
 System.out.println("User created with concurrent task executor");
}
SimpleAsyncTaskExecutor does make sense in cases, if you want to execute some long-time-executing tasks, e.g. if you want to compress log files at the end of a day. In other cases, if you want to execute a short-time-executing task every n seconds or minutes, you should use the ThreadPoolTaskExecutor, because of reusing of system resources.

References & courtesy:
https://www.devglan.com/spring-boot/spring-boot-async-task-executor
https://docs.spring.io/spring/docs/4.2.x/spring-framework-reference/html/scheduling.html

CompletableFuture vs Future

The Future interface was introduced in Java 5 to handle asynchronous computations. But, this interface did not have any methods to combine multiple asynchronous computations and handle all the possible errors. The  CompletableFuture implements Future interface, it can combine multiple asynchronous computations, handle possible errors and offers much more capabilities.


First of all, the CompletableFuture class implements the Future interface, so you can use it as a Future implementation, but with additional completion logic.

For example, you can create an instance of this class with a no-arg constructor to represent some future result, hand it out to the consumers and complete it at some time in the future using the complete method. The consumers may use the get method to block the current thread until this result will be provided.
CompletableFuture with encapsulated Computation Logic
Static methods runAsync and supplyAsync allow us to create a CompletableFuture instance out of Runnable and Supplier functional types correspondingly.
Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature.
The Runnable interface is the same old interface that is used in threads and it does not allow to return a value.
The Supplier interface is a generic functional interface with a single method that has no arguments and returns a value of a parameterized type.
This allows to provide an instance of the Supplier as a lambda expression that does the calculation and returns the result. This is as simple as:
1
2
3
4
5
6
CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
 // ...
 assertEquals("Hello", future.get());
thenApply to process the result of an async function


CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
assertEquals("Hello World", future.get());

Running multiple Futures in Parallel
CompletableFuture<String> future1 
  = CompletableFuture.supplyAsync(() -> "Hello");
2 --> "Beautiful"
3 --> "World"

CompletableFuture<Void> combinedFuture
  = CompletableFuture.allOf(future1, future2, future3);
 // ...
 combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);

Handling Errors
Instead of catching an exception in a syntactic block, the CompletableFuture class allows you to handle it in a special handle method
This method receives two parameters: a result of a computation (if it finished successfully) and the exception thrown (if some computation step did not complete normally).


String name = null;
// ...
CompletableFuture<String> completableFuture 
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;

  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also to have the ability to complete it with an exception. The completeExceptionally method is intended for that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:
completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));
 // ...
completableFuture.get(); // ExecutionException

References:
https://www.baeldung.com/java-completablefuture
https://www.baeldung.com/thread-pool-java-and-guava




No comments:

Post a Comment