Spring Boot

How to trigger Mono execution after another Mono terminates


Spring Datafication 2023. 3. 9. 13:14

Problem

I have two Mono's that I want to execute one after the other. The first Mono returns a value that I want to use in the second Mono. I want to execute the second Mono after the first Mono is done.
How can I do this?

In this post, we will see how to call a Mono after another Mono is done.

Scenario

In an attempt to create some logic, we might require calling a Mono after another Mono is done. In this post, a typical example is with Book Publication Api using graphql.
Details of our pom.xml dependencies file might look like this:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency> 
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-graphql</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.0.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency> 
    </dependencies>

If we did have some queries that looks like this: for our db schema, this implies if we want
to add a new author, we need to have a book id that already exists in the books table.

Our SQL SCHEMA FROM START is LIKE THIS:

create table if not exists books (
  id uuid default random_uuid() not null primary key ,
  bookName varchar(255) not null  ,
  pages int not null
);

CREATE TABLE IF NOT EXISTS  authors(
    id uuid default random_uuid()  not null,
    authorName VARCHAR(255),
    age int not null,
    book_id UUID  NOT NULL,
    CONSTRAINT book_author_fk FOREIGN KEY(book_id) REFERENCES books(id)
)

Also, imagine our MUTATION SCHEMA looks like this:

type Mutation{
    addBook(bookName:String!,pages:Int!,authorName:String,age:Int!):Book
}

This implies that anytime we add a book, we can add an author. Our mutation query can send us a payload with all the details required like this:

mutation {
  addBook(bookName: "BCJASVM", pages: 3, authorName:"FasCode", age: 23) {
    id
    bookName
    pages
    author {
      id
      authorName
      age
    }
  }
}

If we did send a mutation POST to our service to save this BOOK and AUTHOR with code like this:

package com.ease.leaarn.blog.service;

import com.ease.leaarn.blog.model.Author;
import com.ease.leaarn.blog.model.Book;
import com.ease.leaarn.blog.repository.AuthorRepo;
import com.ease.leaarn.blog.repository.BookRepo;
import graphql.schema.DataFetcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

@Service
public class BookService {

    @Autowired
    private BookRepo bookRepo;
    @Autowired
    private AuthorRepo authorRepo;

    @Autowired
    private AuthorService authorService;

    public DataFetcher<CompletableFuture<Book>> getBook() {
        return dataFetchingEnvironment -> {
            String id = dataFetchingEnvironment.getArgument("id");
            return bookRepo.getBook(id)
                    .toFuture();
        };
    }

    //getBooks
    public DataFetcher<CompletableFuture<List<Book>>> getBooks() {
       return env->bookRepo.getBooks().collectList().toFuture();
    }


    //addBook
    public DataFetcher<CompletableFuture<Book>> addBook() {
        return env -> {
            String authorName = env.getArgument("authorName");
            String bookName=env.getArgument("bookName");
            int pages=env.getArgument("pages");
            int age = env.getArgument("age"); 

            Mono<Book> bookMono = bookRepo.addBook(new Book(bookName, pages));

            authorService.addAuthor(authorName, age, Objects.requireNonNull(bookMono.block()).getId());
            return bookMono.toFuture();
        };
    }

}

Mutations SEEMS TO WORK FINE BUT IT DOESN'T WORK AS EXPECTED.

We can confirm this by checking our database tables. We can see that the author table is empty.
Or just send a query to confirm that.


The Problem

What possible could be wrong? Our book saved correctly but the author didn't.
This does not mean our authorService.addAuthor() method is not working.
The problem is the ISSUE with MONO and FUTURE.

  1. SO, WHAT IS THE ISSUE WITH MONO AND FUTURE?
    Mono's are reactive streams. They are non-blocking.
    They are asynchronous. They are lazy. They are not executed until they are subscribed to.
    So, this means we can just subscribe our mono to a publisher, and it will be executed.
    In this example I would simply call the repository method and SUBSCRIBE to it.
    The changed code would look like this:

public DataFetcher<CompletableFuture<Book>> addBook() {
        return env -> {
            String authorName = env.getArgument("authorName");
            String bookName=env.getArgument("bookName");
            int pages=env.getArgument("pages");
            int age = env.getArgument("age"); 
            Mono<Book> bookMono = bookRepo.addBook(new Book(bookName, pages));
            Mono<Author> authorMono = authorRepo.addAuthor(new Author(authorName, age, Objects.requireNonNull(bookMono.block()).getId()));
            authorMono.subscribe();
            return bookMono.toFuture();
        };
    }

This is what the query response looks like now:

WAIT A MINUTE, WHAT IS THE ISSUE WITH THIS CODE?

We did subscribe and it seems to work. But deep down what just happened?
It seems we subscribed to the authorMono, and it executed. But then we forget the only REASON we subscribed to it was to execute it.
And also we forget the reason why it did not work in the first place? It was because there was a PREVIOUS bookMono that prevented the authorMono from executing(DEADLOCK).
So, it seems to work. and NOW the problem is by default the bookMono in thread is subscribed again. We can confirm that this execute twice in query.

  1. SOLUTION:flatMap() can be used to chain the execution of the two mono's.
    In our case we can write it like this
    And it works just fine.
public DataFetcher<CompletableFuture<Book>> addBook() {
    return env -> { 
        String authorName = env.getArgument("authorName"); 
        String bookName=env.getArgument("bookName"); 
        int pages=env.getArgument("pages");
        int age = env.getArgument("age"); 
        Mono<Book> bookMono = bookRepo.addBook(new Book(bookName, pages)) 
        .flatMap(book -> authorRepo.addAuthor(new Author(authorName, age, book.getId())) .map(author -> book) );
        return bookMono.toFuture(); };
}

Conclusion

There are possible many other alternatives to solve this problem.
I do consider myself a beginner in the world of reactive programming. So definitely there are BETTER and EFFICIENT ways to solve this problem.

Reactive Streams Reference

r2dbc

reflectoring

KNOLDUS

Bauldung

ProjectReactor

stackoverflow

stackoverflow

stackoverflow

stackoverflow

반응형

'Spring Boot' 카테고리의 다른 글

TESTCONTIANERS  (0) 2023.05.04
SER.1. How to Create Multi-Project Maven POMs  (3) 2023.04.05
What is Spring Mobile Device  (0) 2022.11.07