
How to run tasks in parallel in Java and Spring Boot applications
Introduction
In this blog post, we will see how we can run tasks in parallel in Java and Spring Boot applications. Often in modern applications we need to run multiple tasks like accessing multiple APIs, fetching data from database, file operations etc. In such cases, running tasks in parallel can help in improving the performance of the application. We will see how we can achieve this using the below methods:
- CompletableFuture
- @Async annotation
- Virtual Threads
- Structured Concurrency
Usecase
Let us consider an API which takes stock symbol as input and return the current position of the stock along with current market price and some company information. To get this information, we need to call multiple APIs to get market price, company information etc. Also, we need to get the orders for the company from database. In the below code in getStockPositionSequential
all these tasks are run sequentially.
@RestController @AllArgsConstructor public class StockPositionController { private final StockPositionService stockPositionService; @GetMapping("/stock-position/{stockSymbol}") public StockHolding getStockPosition(@PathVariable String stockSymbol) { return stockPositionService.getStockPositionSequential(stockSymbol); } } @Service @AllArgsConstructor @Slf4j public class StockPositionService { private final StockOrderService stockOrderService; private final StockInformationService stockInformationService; public StockHolding getStockPositionSequential(String symbol) { long startTime = System.currentTimeMillis(); List<StockOrder> orders = stockOrderService.getOrdersBySymbol(symbol); // Gets data from database var price = stockInformationService.getPrice(symbol); // Gets market price from external service var company = stockInformationService.getCompanyDetails(symbol); // Gets company details from external service log.info("getStockPositionSequential.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(orders, price, company); } private StockHolding getStockHolding(List<StockOrder> orders, StockPrice price, CompanyDetails company) { double totalQuantity = orders.stream() .mapToDouble(order -> order.orderType() == OrderType.BUY ? order.quantity() : -order.quantity()) .sum(); return buildStockHolding(orders.getFirst().symbol(), totalQuantity, price.price(), company); } private StockHolding buildStockHolding(String symbol, Double quantity, Double marketPrice, CompanyDetails company) { return new StockHolding( symbol, quantity, quantity * marketPrice, marketPrice, company.companyName(), company.industry(), company.website(), company.sector() ); } }
Let us call this API with stock symbol AAPL
and see the output. We will use httpie to call the API.
http GET localhost:8080/stock-position/AAPL HTTP/1.1 200 Connection: keep-alive Content-Type: application/json Date: Sat, 16 Nov 2024 08:07:28 GMT Keep-Alive: timeout=60 Transfer-Encoding: chunked { "companyName": "Apple Inc.", "currentMarketPrice": 225.0, "currentMarketValue": 16875.0, "industry": "Consumer Electronics", "quantity": 75.0, "sector": "Technology", "stockSymbol": "AAPL", "website": "https://www.apple.com" }
I have added logs for logging time taken and also the thread processing each task. Let us run a test and see the output.
@Test void getStockPositionSequential() { String symbol = "AAPL"; StockHolding stockHolding = stockPositionService.getStockPositionSequential(symbol); assertNotNull(stockHolding); assertEquals(symbol, stockHolding.stockSymbol()); }
2024-11-17 10:28:22 [main] INFO c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#1,main,5,main] 2024-11-17 10:28:22 [main] INFO c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#1,main,5,main] 2024-11-17 10:28:23 [main] INFO c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#1,main,5,main] 2024-11-17 10:28:23 [main] INFO c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#1,main,5,main] 2024-11-17 10:28:23 [main] INFO c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#1,main,5,main] 2024-11-17 10:28:23 [main] INFO c.c.s.service.StockPositionService - getStockPositionSequential.Time taken to get stock position: 1356ms
All the tasks are run sequentially and the total time taken is 1356ms. We can see that each task is run in the main thread.
CompletableFuture
CompletableFuture
is a class introduced in Java 8 which represents a future result of an asynchronous computation. It provides a way to perform asynchronous operations and combine them.
Let us see how we can use CompletableFuture
to run the tasks in parallel. We will create a method getStockPositionCompletableFuture
which will run the tasks getOrdersBySymbol
, getPrice
and getCompanyDetails
in parallel using CompletableFuture
.
public StockHolding getStockPositionCompletableFuture(String symbol) { long startTime = System.currentTimeMillis(); CompletableFuture<List<StockOrder>> ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol)); CompletableFuture<StockPrice> priceFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getPrice(symbol)); CompletableFuture<CompanyDetails> companyFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getCompanyDetails(symbol)); try { List<StockOrder> orders = ordersFuture.get(); StockPrice price = priceFuture.get(); CompanyDetails company = companyFuture.get(); log.info("getStockPositionCompletableFuture.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(orders, price, company); } catch (Exception e) { log.error("Error while getting stock position", e); return null; } }
Here we first create CompletableFuture
objects for each task using supplyAsync
method. This method takes a Supplier
as input which returns the result of the task. We use supplyAsync
to run all 3 tasks in parallel. We then call get
method on each CompletableFuture
object to get the result. The get
method is a blocking call which waits for the result to be available. We then combine the results to get the final StockHolding
object.
If no return is expected from the task, we can use runAsync
method instead of supplyAsync
.
Let us run a test and see the output.
@Test void getStockPositionCompletableFuture() { String symbol = "AAPL"; StockHolding stockHolding = stockPositionService.getStockPositionCompletableFuture(symbol); assertNotNull(stockHolding); assertEquals(symbol, stockHolding.stockSymbol()); }
2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-1] INFO c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#42,ForkJoinPool.commonPool-worker-1,5,main] 2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#44,ForkJoinPool.commonPool-worker-3,5,main] 2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-2] INFO c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#43,ForkJoinPool.commonPool-worker-2,5,main] 2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#44,ForkJoinPool.commonPool-worker-3,5,main] 2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-2] INFO c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#43,ForkJoinPool.commonPool-worker-2,5,main] 2024-11-19 23:19:16 [main] INFO c.c.s.service.StockPositionService - getStockPositionUsingCF.Time taken to get stock position: 459ms
We can see that all the tasks are run in parallel and the total time taken has come down to 459ms. We can see that each task is run in a different thread from the common pool (ForkJoinPool.commonPool-worker-1
, ForkJoinPool.commonPool-worker-2
, ForkJoinPool.commonPool-worker-3
).
If we want to use a custom Executor
instead of the common pool, we can use the overloaded supplyAsync
method which takes an Executor
as argument.
var executor = Executors.newFixedThreadPool(3); CompletableFuture<List<StockOrder>> ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol), executor);
We can change the above code to more functional style using thenCombine
method.
public StockHolding getStockPositionCompletableFutureFunctional(String symbol) { long startTime = System.currentTimeMillis(); var ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol)); var priceFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getPrice(symbol)); var companyFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getCompanyDetails(symbol)); return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new) .thenCombine(companyFuture, (ordersAndPrice, company) -> { log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company); }).join(); }
thenCombine
method takes 2 CompletableFuture
objects and a BiFunction
which combines the results of the 2 tasks. We can chain multiple thenCombine
methods to combine multiple tasks. The join
method is a blocking call which waits for the result to be available.
Similar to thenCombine
, we have thenApply
, thenAccept
, thenCompose
etc. which can be used based on the requirement.
thenApply
method can be used when we want to apply some transformation on the result of the task like below.
return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new) .thenCombine(companyFuture, (ordersAndPrice, company) -> { log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company); }) .thenApply(stockHolding -> { return new EnrichedStockHolding(stockHolding, additionalInfo); }) .join();
thenAccept
method can be used when we want to perform some action on the result of the task like below without returning any value.
return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new) .thenCombine(companyFuture, (ordersAndPrice, company) -> { log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company); }) .thenAccept(stockHolding -> log.info("StockHolding: {}", stockHolding)) .join();
thenCompose
method can be used when we want to chain multiple CompletableFuture
objects and one task depends on the result of the previous task like below.
var products = CompletableFuture.supplyAsync(() -> { return getUser(userId); }).thenCompose(user -> CompletableFuture.supplyAsync(() -> { return getOrders(user); }).thenCompose(orders -> CompletableFuture.supplyAsync(() -> { return getProducts(orders); }).join()));
For handling exceptions in the tasks, we can use exceptionally
method. If any task fails, the exceptionally
method below it in the pipeline will be called, and we can handle the exception there.
return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new) .thenCombine(companyFuture, (ordersAndPrice, company) -> { log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company); }) .exceptionally(e -> { log.error("Error while getting stock position", e); throw new RuntimeException("Error while getting stock position", e); }) .join();
@Async annotation
Spring provides @Async
annotation to run methods asynchronously. We can annotate a method with @Async
and Spring will run the method in a separate thread. We need to enable async support in the application by adding @EnableAsync
annotation to the configuration class. We will add it to the main class StockAdvisorApplication
.
@SpringBootApplication @EnableAsync public class StockAdvisorApplication { public static void main(String[] args) { SpringApplication.run(StockAdvisorApplication.class, args); } }
Now we need to annotate all 3 methods getOrdersBySymbol
, getPrice
and getCompanyDetails
with @Async
annotation. Also, we need to return CompletableFuture
from these methods.
@Async public CompletableFuture<List<StockOrder>> getOrdersBySymbolAsync(String symbol) { log.info("getOrdersBySymbol.currentThread: {}", Thread.currentThread()); var orders = stockOrderRepository.findBySymbol(symbol); if(orders.isEmpty()) { throw new NoDataFoundException(); } return CompletableFuture.completedFuture(orders); }
Now let us add a method getStockPositionAsync
which will call 3 @Async
annotated methods asynchronously in StockPositionService
. We can get rid of supplyAsync
since the proxy which Spring creates for the @Async
annotated methods will take care of running the methods in separate threads.
public StockHolding getStockPositionUsingAsync(String symbol) { long startTime = System.currentTimeMillis(); var ordersFuture = stockOrderService.getOrdersBySymbolAsync(symbol); var priceFuture = stockInformationService.getPriceAsync(symbol); var companyFuture = stockInformationService.getCompanyDetailsAsync(symbol); return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new) .thenCombine(companyFuture, (ordersAndPrice, company) -> { log.info("getStockPositionUsingAsync.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company); }) .exceptionally(e -> { log.error("Error while getting stock position", e); return null; }) .join(); }
Now let us run a test for this and see the output.
@Test void getStockPositionAsync() { String symbol = "AAPL"; StockHolding stockHolding = stockPositionService.getStockPositionUsingAsync(symbol); assertNotNull(stockHolding); assertEquals(symbol, stockHolding.stockSymbol()); }
2024-11-23 00:24:33 [task-1] INFO c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#46,task-1,5,main] 2024-11-23 00:24:33 [task-2] INFO c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#47,task-2,5,main] 2024-11-23 00:24:33 [task-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#48,task-3,5,main] 2024-11-23 00:24:34 [task-2] INFO c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#47,task-2,5,main] 2024-11-23 00:24:34 [task-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#48,task-3,5,main] 2024-11-23 00:24:34 [task-3] INFO c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 504ms
We can see that all the tasks are run in separate threads (task-1
, task-2
, task-3
) and the total time taken is 504ms. By default, Spring uses a SimpleAsyncTaskExecutor
to run the async methods. We can configure a custom TaskExecutor
by creating a bean of TaskExecutor
type like below.
@Configuration public class AsyncConfig { @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(25); executor.setThreadNamePrefix("AsyncThread-"); executor.initialize(); return executor; } }
Now the log will look like below with the thread name prefix AsyncThread-
.
2024-11-23 00:27:48 [AsyncThread-1] INFO c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#46,AsyncThread-1,5,main] 2024-11-23 00:27:48 [AsyncThread-2] INFO c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#47,AsyncThread-2,5,main] 2024-11-23 00:27:48 [AsyncThread-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#48,AsyncThread-3,5,main] 2024-11-23 00:27:48 [AsyncThread-2] INFO c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#47,AsyncThread-2,5,main] 2024-11-23 00:27:48 [AsyncThread-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#48,AsyncThread-3,5,main] 2024-11-23 00:27:48 [AsyncThread-3] INFO c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 503ms
Virtual Threads
Now we will see how we can improve parallelism and throughput using Virtual Threads. Before that let us first understand why we need Virtual Threads.
Why Virtual Threads
Traditionally threading model in java which has been there since the first version of Java has some limitations like:
- Each java thread is a thin wrapper on an OS thread which consumes memory and resources.
- Creating and destroying threads is expensive. This is usually solved by using thread pools.
- Threads are not lightweight and can't be created in large numbers. A typical machine can run only few thousand threads.
In normal business application often we will be making some blocking calls like fetching data from database, calling external service etc. In such cases, threads are blocked and are not doing any work and CPU is idle while waiting for response. To completely utilize the CPU, we need to run more threads, but we are limited by the number of threads we can create due to the memory usage mostly.
This is where virtual threads come into picture.
How Virtual Threads work
Virtual threads are lightweight threads which takes only few KB of memory. Multiple virtual threads can run on a single OS thread. When a virtual thread is blocked, its current state is saved to heap using Continuation
and the OS thread is released and available for other virtual threads. When the blocking operation is completed, the virtual thread is resumed from the saved state. This allows us to run millions of virtual threads on a typical machine where we can run only few thousand normal threads.
Enabling Virtual Threads
To enable virtual threads in Spring Boot application, we need to add the below properties in application.properties
.
spring.threads.virtual.enabled=true
After that our logs will be like below
2024-11-23 00:45:41 [task-2] INFO c.c.s.s.StockInformationService - getPrice.start.Current thread: VirtualThread[#48,task-2]/runnable@ForkJoinPool-1-worker-2 2024-11-23 00:45:41 [task-1] INFO c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: VirtualThread[#46,task-1]/runnable@ForkJoinPool-1-worker-1 2024-11-23 00:45:41 [task-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: VirtualThread[#50,task-3]/runnable@ForkJoinPool-1-worker-3 2024-11-23 00:45:41 [task-2] INFO c.c.s.s.StockInformationService - getPrice.end.Current thread: VirtualThread[#48,task-2]/runnable@ForkJoinPool-1-worker-4 2024-11-23 00:45:41 [task-3] INFO c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: VirtualThread[#50,task-3]/runnable@ForkJoinPool-1-worker-3 2024-11-23 00:45:41 [task-3] INFO c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 508ms
Using virtual threads will improve the throughput and parallelism of the application if you have blocking operations like database access, network calls etc. But if your application is mostly CPU bound, then virtual threads may not provide too much benefit.
Current virtual thread has a limitation that if a blocking call is executed in a synchronized
block, platform thread is not released. Solution for this problem is to use other locking mechanisms like ReentrantLock
instead of synchronized
block. This is expected to be fixed in future versions of Java.
Structured Concurrency
There is one issue with our concurrency model now. We are running 3 tasks in parallel, but there is no communication between these tasks/threads. If one task fails, the other tasks are still running unnecessarily. This is where Structured Concurrency comes into picture. This is still a preview feature in Java 23.
With Structured Concurrency, we can create a StructuredTaskScope
usually in a try with resources block. You then fork tasks using fork
method and join them using join
method. If any task fails, the StructuredTaskScope
will throw an exception and the other tasks will be cancelled since we are using a task scope of type ShutdownOnFailure
public StockHolding getStockPositionStructuredConcurreny(String symbol) { long startTime = System.currentTimeMillis(); try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { var ordersTask = scope.fork(() -> stockOrderService.getOrdersBySymbol(symbol)); var priceTask = scope.fork(() -> stockInformationService.getPrice(symbol)); var companyDetailsTask = scope.fork(() -> stockInformationService.getCompanyDetails(symbol)); scope.join(); scope.throwIfFailed(); List<StockOrder> orders = ordersTask.get(); StockPrice price = priceTask.get(); CompanyDetails company = companyDetailsTask.get(); log.info("getStockPositionStructuredConcurreny.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime); return getStockHolding(orders, price, company); } catch (Exception e) { log.error("Error while getting stock position", e); return null; } }
Similar to ShutdownOnFailure
there is ShutdownOnSuccess
which will cancel other tasks if any of the task completes successfully. This is useful in scenarios when you have multiple end points, and you want to get the result from the first end point which completes successfully.
public StockPrice getPrice(String symbol) { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<StockPrice>()) { var priceTask1 = scope.fork(() -> getPriceFromService1(symbol)); var priceTask2 = scope.fork(() -> getPriceFromService2(symbol)); scope.join(); return scope.result(); } catch (Exception e) { log.error("Error while getting price", e); return null; } }
Also, you can create a custom scope which can be used for specific requirements.
Conclusion
In this blog post, we saw how we can run tasks in parallel in Java and Spring Boot applications using CompletableFuture
, @Async
annotation, Virtual Threads
and Structured Concurrency
. We saw how we can improve the performance of the application by running tasks in parallel. We also saw how we can handle exceptions and communicate between tasks using Structured Concurrency.
To stay updated with the latest updates in Java and Spring follow us on youtube, linked in and medium. You can find the code used in this blog here
You can watch a more detailed video version on virtual threads and structured concurrency here:
Related Posts
Mastering New RestClient API in Spring
This guide explores the features and usage of the RestClient introduced in Spring 6, providing a modern and fluent API for making HTTP requests. It demonstrates how to create and customize RestClient instances, make API calls, and handle responses effectively.
Building WebSocket Applications with Spring Boot
Learn how to build real-time applications using WebSockets in Spring Boot. This guide covers both simple WebSocket implementation and STOMP-based messaging, with practical examples of building a chat application.