CodeWiz Logo

    CodeWiz

    How to run tasks in parallel in Java and Spring Boot applications

    How to run tasks in parallel in Java and Spring Boot applications

    24/11/2024

    Introduction

    In this blog post, we will see how we can run tasks in parallel in Java and Spring Boot applications. Often in modern applications we need to run multiple tasks like accessing multiple APIs, fetching data from database, file operations etc. In such cases, running tasks in parallel can help in improving the performance of the application. We will see how we can achieve this using the below methods:

    • CompletableFuture
    • @Async annotation
    • Virtual Threads
    • Structured Concurrency

    Usecase

    Let us consider an API which takes stock symbol as input and return the current position of the stock along with current market price and some company information. To get this information, we need to call multiple APIs to get market price, company information etc. Also, we need to get the orders for the company from database. In the below code in getStockPositionSequential all these tasks are run sequentially.

    @RestController
    @AllArgsConstructor
    public class StockPositionController {
    
        private final StockPositionService stockPositionService;
    
        @GetMapping("/stock-position/{stockSymbol}")
        public StockHolding getStockPosition(@PathVariable String stockSymbol) {
            return stockPositionService.getStockPositionSequential(stockSymbol);
        }
    }
    
    @Service
    @AllArgsConstructor
    @Slf4j
    public class StockPositionService {
    
        private final StockOrderService stockOrderService;
        private final StockInformationService stockInformationService;
    
        public StockHolding getStockPositionSequential(String symbol) {
            long startTime = System.currentTimeMillis();
    
            List<StockOrder> orders = stockOrderService.getOrdersBySymbol(symbol); // Gets data from database
            var price = stockInformationService.getPrice(symbol); // Gets market price from external service
            var company = stockInformationService.getCompanyDetails(symbol); // Gets company details from external service
    
            log.info("getStockPositionSequential.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
    
            return getStockHolding(orders, price, company);
        }
    
    
        private StockHolding getStockHolding(List<StockOrder> orders, StockPrice price, CompanyDetails company) {
            double totalQuantity = orders.stream()
                    .mapToDouble(order -> order.orderType() == OrderType.BUY ? order.quantity() : -order.quantity())
                    .sum();
            return buildStockHolding(orders.getFirst().symbol(), totalQuantity, price.price(), company);
        }
    
        private StockHolding buildStockHolding(String symbol, Double quantity, Double marketPrice, CompanyDetails company) {
            return new StockHolding(
                    symbol,
                    quantity,
                    quantity * marketPrice,
                    marketPrice,
                    company.companyName(),
                    company.industry(),
                    company.website(),
                    company.sector()
            );
        }
    }
    

    Let us call this API with stock symbol AAPL and see the output. We will use httpie to call the API.

    http GET localhost:8080/stock-position/AAPL
    HTTP/1.1 200 
    Connection: keep-alive
    Content-Type: application/json
    Date: Sat, 16 Nov 2024 08:07:28 GMT
    Keep-Alive: timeout=60
    Transfer-Encoding: chunked
    
    {
        "companyName": "Apple Inc.",
        "currentMarketPrice": 225.0,
        "currentMarketValue": 16875.0,
        "industry": "Consumer Electronics",
        "quantity": 75.0,
        "sector": "Technology",
        "stockSymbol": "AAPL",
        "website": "https://www.apple.com"
    }

    I have added logs for logging time taken and also the thread processing each task. Let us run a test and see the output.

    @Test
    void getStockPositionSequential() {
        String symbol = "AAPL";
        StockHolding stockHolding = stockPositionService.getStockPositionSequential(symbol);
        assertNotNull(stockHolding);
        assertEquals(symbol, stockHolding.stockSymbol());
    }
    2024-11-17 10:28:22 [main] INFO  c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#1,main,5,main]
    2024-11-17 10:28:22 [main] INFO  c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#1,main,5,main]
    2024-11-17 10:28:23 [main] INFO  c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#1,main,5,main]
    2024-11-17 10:28:23 [main] INFO  c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#1,main,5,main]
    2024-11-17 10:28:23 [main] INFO  c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#1,main,5,main]
    2024-11-17 10:28:23 [main] INFO  c.c.s.service.StockPositionService - getStockPositionSequential.Time taken to get stock position: 1356ms

    All the tasks are run sequentially and the total time taken is 1356ms. We can see that each task is run in the main thread.

    CompletableFuture

    CompletableFuture is a class introduced in Java 8 which represents a future result of an asynchronous computation. It provides a way to perform asynchronous operations and combine them.

    Let us see how we can use CompletableFuture to run the tasks in parallel. We will create a method getStockPositionCompletableFuture which will run the tasks getOrdersBySymbol, getPrice and getCompanyDetails in parallel using CompletableFuture.

    public StockHolding getStockPositionCompletableFuture(String symbol) {
        long startTime = System.currentTimeMillis();
    
        CompletableFuture<List<StockOrder>> ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol));
        CompletableFuture<StockPrice> priceFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getPrice(symbol));
        CompletableFuture<CompanyDetails> companyFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getCompanyDetails(symbol));
    
        try {
            List<StockOrder> orders = ordersFuture.get();
            StockPrice price = priceFuture.get();
            CompanyDetails company = companyFuture.get();
    
            log.info("getStockPositionCompletableFuture.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
    
            return getStockHolding(orders, price, company);
        } catch (Exception e) {
            log.error("Error while getting stock position", e);
            return null;
        }
    }

    Here we first create CompletableFuture objects for each task using supplyAsync method. This method takes a Supplier as input which returns the result of the task. We use supplyAsync to run all 3 tasks in parallel. We then call get method on each CompletableFuture object to get the result. The get method is a blocking call which waits for the result to be available. We then combine the results to get the final StockHolding object. If no return is expected from the task, we can use runAsync method instead of supplyAsync.

    Let us run a test and see the output.

    @Test
    void getStockPositionCompletableFuture() {
        String symbol = "AAPL";
        StockHolding stockHolding = stockPositionService.getStockPositionCompletableFuture(symbol);
        assertNotNull(stockHolding);
        assertEquals(symbol, stockHolding.stockSymbol());
    }
    2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-1] INFO  c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#42,ForkJoinPool.commonPool-worker-1,5,main]
    2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#44,ForkJoinPool.commonPool-worker-3,5,main]
    2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-2] INFO  c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#43,ForkJoinPool.commonPool-worker-2,5,main]
    2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#44,ForkJoinPool.commonPool-worker-3,5,main]
    2024-11-19 23:19:16 [ForkJoinPool.commonPool-worker-2] INFO  c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#43,ForkJoinPool.commonPool-worker-2,5,main]
    2024-11-19 23:19:16 [main] INFO  c.c.s.service.StockPositionService - getStockPositionUsingCF.Time taken to get stock position: 459ms

    We can see that all the tasks are run in parallel and the total time taken has come down to 459ms. We can see that each task is run in a different thread from the common pool (ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-3).

    If we want to use a custom Executor instead of the common pool, we can use the overloaded supplyAsync method which takes an Executor as argument.

    var executor = Executors.newFixedThreadPool(3);
    CompletableFuture<List<StockOrder>> ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol), executor);

    We can change the above code to more functional style using thenCombine method.

    public StockHolding getStockPositionCompletableFutureFunctional(String symbol) {
        long startTime = System.currentTimeMillis();
    
        var ordersFuture = CompletableFuture.supplyAsync(() -> stockOrderService.getOrdersBySymbol(symbol));
        var priceFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getPrice(symbol));
        var companyFuture = CompletableFuture.supplyAsync(() -> stockInformationService.getCompanyDetails(symbol));
    
        return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new)
                .thenCombine(companyFuture, (ordersAndPrice, company) -> {
                    log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
                    return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company);
                }).join();
    }

    thenCombine method takes 2 CompletableFuture objects and a BiFunction which combines the results of the 2 tasks. We can chain multiple thenCombine methods to combine multiple tasks. The join method is a blocking call which waits for the result to be available.

    Similar to thenCombine, we have thenApply, thenAccept, thenCompose etc. which can be used based on the requirement.

    thenApply method can be used when we want to apply some transformation on the result of the task like below.

    return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new)
            .thenCombine(companyFuture, (ordersAndPrice, company) -> {
                log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
                return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company);
            })
            .thenApply(stockHolding -> {
                return new EnrichedStockHolding(stockHolding, additionalInfo);
            })
            .join();

    thenAccept method can be used when we want to perform some action on the result of the task like below without returning any value.

    return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new)
            .thenCombine(companyFuture, (ordersAndPrice, company) -> {
                log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
                return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company);
            })
            .thenAccept(stockHolding -> log.info("StockHolding: {}", stockHolding))
            .join();
    

    thenCompose method can be used when we want to chain multiple CompletableFuture objects and one task depends on the result of the previous task like below.

    var products = CompletableFuture.supplyAsync(() -> {
            return getUser(userId);
        }).thenCompose(user -> CompletableFuture.supplyAsync(() -> {
            return getOrders(user);
        }).thenCompose(orders -> CompletableFuture.supplyAsync(() -> {
            return getProducts(orders);
        }).join()));

    For handling exceptions in the tasks, we can use exceptionally method. If any task fails, the exceptionally method below it in the pipeline will be called, and we can handle the exception there.

    return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new)
            .thenCombine(companyFuture, (ordersAndPrice, company) -> {
                log.info("getStockPositionUsingCF.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
                return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company);
            })
            .exceptionally(e -> {
                log.error("Error while getting stock position", e);
                throw new RuntimeException("Error while getting stock position", e);
            })
            .join();

    @Async annotation

    Spring provides @Async annotation to run methods asynchronously. We can annotate a method with @Async and Spring will run the method in a separate thread. We need to enable async support in the application by adding @EnableAsync annotation to the configuration class. We will add it to the main class StockAdvisorApplication.

    @SpringBootApplication
    @EnableAsync
    public class StockAdvisorApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(StockAdvisorApplication.class, args);
    	}
    
    }

    Now we need to annotate all 3 methods getOrdersBySymbol, getPrice and getCompanyDetails with @Async annotation. Also, we need to return CompletableFuture from these methods.

    @Async
    public CompletableFuture<List<StockOrder>> getOrdersBySymbolAsync(String symbol) {
        log.info("getOrdersBySymbol.currentThread: {}", Thread.currentThread());
        var orders =  stockOrderRepository.findBySymbol(symbol);
        if(orders.isEmpty()) {
            throw new NoDataFoundException();
        }
        return CompletableFuture.completedFuture(orders);
    }

    Now let us add a method getStockPositionAsync which will call 3 @Async annotated methods asynchronously in StockPositionService. We can get rid of supplyAsync since the proxy which Spring creates for the @Async annotated methods will take care of running the methods in separate threads.

    public StockHolding  getStockPositionUsingAsync(String symbol) {
        long startTime = System.currentTimeMillis();
    
        var ordersFuture = stockOrderService.getOrdersBySymbolAsync(symbol);
        var priceFuture = stockInformationService.getPriceAsync(symbol);
        var companyFuture = stockInformationService.getCompanyDetailsAsync(symbol);
    
        return ordersFuture.thenCombine(priceFuture, OrdersAndPrice::new)
                .thenCombine(companyFuture, (ordersAndPrice, company) -> {
                    log.info("getStockPositionUsingAsync.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
                    return getStockHolding(ordersAndPrice.orders, ordersAndPrice.price, company);
                })
                .exceptionally(e -> {
                    log.error("Error while getting stock position", e);
                    return null;
                })
                .join();
    }

    Now let us run a test for this and see the output.

    @Test
    void getStockPositionAsync() {
        String symbol = "AAPL";
        StockHolding stockHolding = stockPositionService.getStockPositionUsingAsync(symbol);
        assertNotNull(stockHolding);
        assertEquals(symbol, stockHolding.stockSymbol());
    }

    2024-11-23 00:24:33 [task-1] INFO  c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#46,task-1,5,main]
    2024-11-23 00:24:33 [task-2] INFO  c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#47,task-2,5,main]
    2024-11-23 00:24:33 [task-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#48,task-3,5,main]
    2024-11-23 00:24:34 [task-2] INFO  c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#47,task-2,5,main]
    2024-11-23 00:24:34 [task-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#48,task-3,5,main]
    2024-11-23 00:24:34 [task-3] INFO  c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 504ms

    We can see that all the tasks are run in separate threads (task-1, task-2, task-3) and the total time taken is 504ms. By default, Spring uses a SimpleAsyncTaskExecutor to run the async methods. We can configure a custom TaskExecutor by creating a bean of TaskExecutor type like below.

    @Configuration
    public class AsyncConfig {
    
        @Bean(name = "taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            executor.setMaxPoolSize(10);
            executor.setQueueCapacity(25);
            executor.setThreadNamePrefix("AsyncThread-");
            executor.initialize();
            return executor;
        }
    }

    Now the log will look like below with the thread name prefix AsyncThread-.

    2024-11-23 00:27:48 [AsyncThread-1] INFO  c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: Thread[#46,AsyncThread-1,5,main]
    2024-11-23 00:27:48 [AsyncThread-2] INFO  c.c.s.s.StockInformationService - getPrice.start.Current thread: Thread[#47,AsyncThread-2,5,main]
    2024-11-23 00:27:48 [AsyncThread-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: Thread[#48,AsyncThread-3,5,main]
    2024-11-23 00:27:48 [AsyncThread-2] INFO  c.c.s.s.StockInformationService - getPrice.end.Current thread: Thread[#47,AsyncThread-2,5,main]
    2024-11-23 00:27:48 [AsyncThread-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: Thread[#48,AsyncThread-3,5,main]
    2024-11-23 00:27:48 [AsyncThread-3] INFO  c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 503ms

    Virtual Threads

    Now we will see how we can improve parallelism and throughput using Virtual Threads. Before that let us first understand why we need Virtual Threads.

    Why Virtual Threads

    Traditionally threading model in java which has been there since the first version of Java has some limitations like:

    • Each java thread is a thin wrapper on an OS thread which consumes memory and resources.
    • Creating and destroying threads is expensive. This is usually solved by using thread pools.
    • Threads are not lightweight and can't be created in large numbers. A typical machine can run only few thousand threads.

    alt text


    In normal business application often we will be making some blocking calls like fetching data from database, calling external service etc. In such cases, threads are blocked and are not doing any work and CPU is idle while waiting for response. To completely utilize the CPU, we need to run more threads, but we are limited by the number of threads we can create due to the memory usage mostly.

    This is where virtual threads come into picture.

    How Virtual Threads work

    Virtual threads are lightweight threads which takes only few KB of memory. Multiple virtual threads can run on a single OS thread. When a virtual thread is blocked, its current state is saved to heap using Continuation and the OS thread is released and available for other virtual threads. When the blocking operation is completed, the virtual thread is resumed from the saved state. This allows us to run millions of virtual threads on a typical machine where we can run only few thousand normal threads.


    alt text


    Enabling Virtual Threads

    To enable virtual threads in Spring Boot application, we need to add the below properties in application.properties.

    spring.threads.virtual.enabled=true

    After that our logs will be like below

    2024-11-23 00:45:41 [task-2] INFO  c.c.s.s.StockInformationService - getPrice.start.Current thread: VirtualThread[#48,task-2]/runnable@ForkJoinPool-1-worker-2
    2024-11-23 00:45:41 [task-1] INFO  c.c.s.service.StockOrderService - getOrdersBySymbol.currentThread: VirtualThread[#46,task-1]/runnable@ForkJoinPool-1-worker-1
    2024-11-23 00:45:41 [task-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.start.Current thread: VirtualThread[#50,task-3]/runnable@ForkJoinPool-1-worker-3
    2024-11-23 00:45:41 [task-2] INFO  c.c.s.s.StockInformationService - getPrice.end.Current thread: VirtualThread[#48,task-2]/runnable@ForkJoinPool-1-worker-4
    2024-11-23 00:45:41 [task-3] INFO  c.c.s.s.StockInformationService - getCompanyDetails.end.Current thread: VirtualThread[#50,task-3]/runnable@ForkJoinPool-1-worker-3
    2024-11-23 00:45:41 [task-3] INFO  c.c.s.service.StockPositionService - getStockPositionUsingAsync.Time taken to get stock position: 508ms

    Using virtual threads will improve the throughput and parallelism of the application if you have blocking operations like database access, network calls etc. But if your application is mostly CPU bound, then virtual threads may not provide too much benefit.

    Current virtual thread has a limitation that if a blocking call is executed in a synchronized block, platform thread is not released. Solution for this problem is to use other locking mechanisms like ReentrantLock instead of synchronized block. This is expected to be fixed in future versions of Java.

    Structured Concurrency

    There is one issue with our concurrency model now. We are running 3 tasks in parallel, but there is no communication between these tasks/threads. If one task fails, the other tasks are still running unnecessarily. This is where Structured Concurrency comes into picture. This is still a preview feature in Java 23.


    alt text


    With Structured Concurrency, we can create a StructuredTaskScope usually in a try with resources block. You then fork tasks using fork method and join them using join method. If any task fails, the StructuredTaskScope will throw an exception and the other tasks will be cancelled since we are using a task scope of type ShutdownOnFailure

    public StockHolding getStockPositionStructuredConcurreny(String symbol) {
        long startTime = System.currentTimeMillis();
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            var ordersTask = scope.fork(() -> stockOrderService.getOrdersBySymbol(symbol));
            var priceTask = scope.fork(() -> stockInformationService.getPrice(symbol));
            var companyDetailsTask = scope.fork(() -> stockInformationService.getCompanyDetails(symbol));
            scope.join();
            scope.throwIfFailed();
            List<StockOrder> orders = ordersTask.get();
            StockPrice price = priceTask.get();
            CompanyDetails company = companyDetailsTask.get();
            log.info("getStockPositionStructuredConcurreny.Time taken to get stock position: {}ms", System.currentTimeMillis() - startTime);
            return getStockHolding(orders, price, company);
        } catch (Exception e) {
            log.error("Error while getting stock position", e);
            return null;
        }
    }

    Similar to ShutdownOnFailure there is ShutdownOnSuccess which will cancel other tasks if any of the task completes successfully. This is useful in scenarios when you have multiple end points, and you want to get the result from the first end point which completes successfully.

    public StockPrice getPrice(String symbol) {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<StockPrice>()) {
            var priceTask1 = scope.fork(() -> getPriceFromService1(symbol));
            var priceTask2 = scope.fork(() -> getPriceFromService2(symbol));
            scope.join();
            return scope.result();
        } catch (Exception e) {
            log.error("Error while getting price", e);
            return null;
        }
    }

    Also, you can create a custom scope which can be used for specific requirements.

    Conclusion

    In this blog post, we saw how we can run tasks in parallel in Java and Spring Boot applications using CompletableFuture, @Async annotation, Virtual Threads and Structured Concurrency. We saw how we can improve the performance of the application by running tasks in parallel. We also saw how we can handle exceptions and communicate between tasks using Structured Concurrency.

    To stay updated with the latest updates in Java and Spring follow us on youtube, linked in and medium. You can find the code used in this blog here

    You can watch a more detailed video version on virtual threads and structured concurrency here: