As of October 1, 2023, LINE has been rebranded as LY Corporation. Visit the new blog of LY Corporation here: LY Corporation Tech Blog

Blog


Kotlin JDSL: Let's use Kotlin to easily write reactive Criteria API

This is the second post in a series about Kotlin JDSL. In the last post, Kotlin JDSL: Writing JPA Criteria API more easily with Kotlin, we looked into the background of why we developed Kotlin JDSL and how we use it. In this post, we'll be looking at Kotlin JDSL Reactive modules. While many of you might have a preconceived notion that JPA cannot be reactive, Hibernate released a reactive JPA library called Hibernate Reactive. If you would like to know more about reactive programming, you can read up about it on the Red Hat blog article 5 Things to Know About Reactive Programming, and on our own LINE Engineering Blog article Let's Play with Reactive Streams on Armeria - Part 1.

Before the release of Hibernate Reactive, we used R2DBC as there were no existing reactive libraries for JPA. However, R2DBC doesn't support object relations and automatic change detection, which are some of JPA's greatest advantages. Simply put, R2DBC is basically an SQL mapper. If you can't use R2DBC, you can use Reactor with synchronous JPA as an alternative. You can follow the patterns in "How Do I Wrap a Synchronous, Blocking Call?" to achieve the same effect. If you're on the fence about adopting Kotlin Reactive JDSL, you can use a synchronous Kotlin JDSL implementation suggested by Reactor.

As of this moment, an official standard for reactive JPA doesn't exist. We have researched ways to support Kotlin JDSL's Reactive JPA for a while, and we decided to create a Kotlin JDSL Reactive JPA implementation by using Hibernate Reactive.

As some of you may not be entirely familiar with the topic of this post, I'd like to briefly go over what Hibernate Reactive is before we begin to look into more details.

Introducing Hibernate Reactive

Hibernate Reactive is currently the only reactive implementation of JPA. Below is an excerpt of the readme on the Hibernate Reactive GitHub repository.

A reactive API for Hibernate ORM, supporting non-blocking database drivers and a reactive style of interaction with the database.

Hibernate Reactive may be used in any plain Java program, but is especially targeted toward usage in reactive environments like Quarkus and Vert.x.

Currently PostgreSQL, MySQL, MariaDB, Db2, CockroachDB, MS SQL Server and Oracle are supported.

To communicate with databases, Hibernate Reactive uses a database driver that is used on the Vert.x JDBC Client. Hibernate Reactive also has plans to eventually support H2 DB, which we frequently use in testing. Finally, Hibernate Reactive uses the reactive feature implementations CompletionStage and CompletableFuture which have been added since JDK 8, and SmallRye's Mutiny.

Things to improve on Hibernate Reactive

Once you open a reactive session on Hibernate Reactive, you can only run queries in threads generated on the current session, and prohibited from running queries on separate threads generated in parallel. Instead you are provided with a method that can run queries while limiting thread scopes such as withSession and withTransaction within SessionFactory

Below is an excerpt of Hibernate Reactive Java sample code. If you take a look at the source code, you can see that you can generate queries using JPA Criteria API.

SessionFactory factory =
        createEntityManagerFactory( persistenceUnitName( args ) )
                .unwrap(SessionFactory.class);
factory.withSession(
        // use a criteria query
        session -> {
            CriteriaQuery<Book> query = factory.getCriteriaBuilder().createQuery( Book.class );
            Root<Author> a = query.from( Author.class );
            Join<Author, Book> b = a.join( Author_.books );
            query.where( a.get( Author_.name ).in( "Neal Stephenson", "William Gibson" ) );
            query.select( b );
            return session.createQuery( query ).getResultList().invoke(
                    books -> books.forEach( book -> out.println( book.getTitle() ) )
            );
        }
).await().indefinitely();

However, the code above has the following disadvantages that make it difficult to use in actual production.

  • You can only write code for queries in withSession and withTransaction.
  • All queries are returned in Mutiny or CompletionStage types, and the resulting values cannot be synchronously processed in withSession and withTransaction. As a result, you must await or block the two method's resulting values outside of withSession and withTransaction if you want to acquire the results.

Kotlin JDSL Reactive was made to efficiently deal with the two problems above.

Introducing Kotlin JDSL Reactive

Kotlin JDSL Reactive provides synchronous versions of the following.

  • QueryFactory for generating JPA queries
  • SpringDataQueryFactory for supporting Spring Data Commons Page, Pageable, Range.Bound

Hibernate Reactive mostly inherits the implementation of hibernate-core. If you take a look at ReactiveSessionFactoryImpl, which is an implementation of ReactiveSessionFactory used in Hibernate Reactive, you can see that it directly inherits SessionFactoryImpl. As a result, you can use most of Hibernate's features in Hibernate Reactive to generate queries through JPA Criteria API. In other words, you can re-use 100% of Kotlin JDSL's existing query generation features.

Below are the interfaces and implementations that Kotlin JDSL Reactive provides.

Next I'd like to cover the classes and interfaces required for running reactive queries through Kotlin JDSL Reactive.

The classes and interfaces required for generating reactive queries

Kotlin JDSL's implementation of reactive JDSL code uses an implementation of Mutiny instead of CompletionStage. We've determined that the more feature-rich Mutiny would be a more forwards-compatible solution than CompletionStage, so we only supported Mutiny instead of supporting two types of reactive implementations.

HibernateMutinyReactiveQueryFactory is a QueryFactory that uses Mutiny to generate queries for Hibernate. As it's a class, the implementation is omitted from the sample code below. As Kotlin JDSL Reactive uses ReactiveQueryFactory to write queries, we use the withFactory method instead of withSession. We also use transactionWithFactory instead of withTransaction. You can write and run queries in a similar fashion to being synchronous inside withFactory and transactionWithFactory without use a chain code.

class HibernateMutinyReactiveQueryFactory {
    suspend fun <T> withFactory(block: suspend (Mutiny.Session, ReactiveQueryFactory) -> T): T {...}
    suspend fun <T> statelessWithFactory(block: suspend (ReactiveQueryFactory) -> T): T {...}
    suspend fun <T> withFactory(block: suspend (ReactiveQueryFactory) -> T): T {...}
    suspend fun <T> transactionWithFactory(block: suspend (ReactiveQueryFactory) -> T): T {...}
    suspend fun <T> transactionWithFactory(block: suspend (Mutiny.Session, ReactiveQueryFactory) -> T): T {...}
    
}

The ReactiveQueryFactory interface seen below is a QueryFactory that can generate SELECT, DELETE, and UPDATE subqueries. The only difference from the existing synchronous QueryFactory, is that the return type of SELECT, DELETE, and UPDATE is ReactiveQuery.

interface ReactiveQueryFactory {
    fun <T> selectQuery(returnType: Class<T>, dsl: CriteriaQueryDsl<T>.() -> Unit): ReactiveQuery<T>
    fun <T : Any> updateQuery(target: KClass<T>, dsl: CriteriaUpdateQueryDsl.() -> Unit): ReactiveQuery<T>
    fun <T : Any> deleteQuery(target: KClass<T>, dsl: CriteriaDeleteQueryDsl.() -> Unit): ReactiveQuery<T>
    fun <T> subquery(returnType: Class<T>, dsl: SubqueryDsl<T>.() -> Unit): SubqueryExpressionSpec<T>
}

As seen above, ReactiveQueryFactory returns the query interface ReactiveQuery, which is generated through Kotlin JDSL's reactive QueryFactory. As there is no standard for reactive JPA, we created our own reactive query interface that supports many implementations without being limited to Hibernate.

Our configuration for ReactiveQuery is as follows. As it's a library based on Kotlin, we used the suspend method from Kotlin Coroutines so that we can easily get query results without any reactive implementations. You can tell what each query object does just by looking at the method names.

interface ReactiveQuery<R> {
    suspend fun singleResult(): R
    suspend fun resultList(): List<R>
    suspend fun singleResultOrNull(): R?
    suspend fun executeUpdate(): Int
    ... 
}

Kotlin JDSL Reactive vs Hibernate Reactive

In this section, let's see what kind of code you can use when writing the same query with Kotlin JDSL Reactive or Hibernate Reactive. Below is an sample query that you can run through Kotlin JDSL Reactive.

@Test
fun withFactoryMultipleOperation(): Unit = runBlocking {
    val sessionFactory = Persistence.createEntityManager("order").unwrap(Mutiny.SessionFactory::class.java)
    val order_5000 = Order(
        purchaserId = 5000,
        groups = setOf()
    )
    val order_3000 = Order(
        purchaserId = 3000,
        groups = setOf()
    )
 
    val queryFactory = HibernateMutinyReactiveQueryFactory(
        sessionFactory = sessionFactory, subqueryCreator = SubqueryCreatorImpl()
    )
    queryFactory.withFactory { session, factory ->
        session.persist(order_5000).awaitSuspending()
        session.persist(order_3000).awaitSuspending()
 
        val order5000 = factory.singleQuery<Order> {
            select(entity(Order::class))
            from(entity(Order::class))
            where(col(Order::purchaserId).equal(5000))
        }
 
        assertThat(order5000.id).isEqualTo(order_5000.id)
 
        val actualOrder_3000 = factory.singleQuery<Order> {
            select(entity(Order::class))
            from(entity(Order::class))
            where(col(Order::purchaserId).equal(3000))
        }
        assertThat(order_3000.id).isEqualTo(actualOrder_3000.id)
    }
}

If you look at the code, you can see that it calls singleQuery by first acquiring ReactiveQueryFactory with the withFactory method. Finally acquiring the order by calling singleQuery once again. You can use code similar to the typical synchronous method like this.

If you use Mutiny's native SessionFactory with JPA Criteria API to process the above, you will get results as seen below.

@Test
fun withNativeSessionFactoryMultipleOperation(): Unit = runBlocking {
    val sessionFactory = Persistence.createEntityManager("order").unwrap(Mutiny.SessionFactory::class.java)
    val order_5000 = Order(
        purchaserId = 5000,
        groups = setOf()
    )
    val order_3000 = Order(
        purchaserId = 3000,
        groups = setOf()
    )
     
    sessionFactory.withSession { session ->
        val criteriaBuilder = sessionFactory.criteriaBuilder
        val query = criteriaBuilder.createQuery(Order::class.java)
        val from = query.from(Order::class.java)
        session.persist(order_5000)
            .flatMap { session.persist(order_3000) }
            .flatMap {
                session.createQuery(
                    query.select(from)
                        .where(criteriaBuilder.equal(from.get<Long>(Order::purchaserId.name), 5000L))
                ).singleResult
            }.flatMap { order5000 ->
                assertThat(order5000.id).isEqualTo(order_5000.id)
                session.createQuery(
                    query.select(from)
                        .where(criteriaBuilder.equal(from.get<Long>(Order::purchaserId.name), 3000L))
                ).singleResult
            }.map { order3000 ->
                assertThat(order3000.id).isEqualTo(order_3000.id)
                order3000
            }
    }.awaitSuspending()
}

The code produces the same result, but you need to implement chain code that uses flatMap and map.

If you want to simply run a single query through Kotlin JDSL Reactive, you can do something similar to the code below.

@Test
fun listQuery(): Unit = runBlocking {
    val order3000 = Order(purchaserId = 3000, ...)
    val order5000 = Order(purchaserId = 3000, ...))
 
    val factory = Persistence.createEntityManagerFactory("order").unwrap(Mutiny.SessionFactory::class.java)
    factory.withSession { session -> session.persist(order3000).flatMap { session.persist(order5000) } }.awaitSuspending()
   
    val queryFactory = HibernateMutinyReactiveQueryFactory(
        sessionFactory = factory,
        subqueryCreator = SubqueryCreatorImpl()
    )
    val orders = queryFactory.listQuery<Order> {
        select(entity(Order::class))
        from(entity(Order::class))
        fetch(Order::groups)
        fetch(OrderGroup::items)
        fetch(OrderGroup::address)
        where(col(Order::purchaserId).equal(5000))
    }
 
    assertThat(orders).containsExactly(order5000)
}

Below is a Hibernate Reactive implementation of the code above. Due to the complexities of Criteria API, writing the code is slightly more difficult and processing fetch cannot be as easily done.

@Test
fun listQuery(): Unit = runBlocking {
    val order3000 = Order(purchaserId = 3000, ...)
    val order5000 = Order(purchaserId = 3000, ...))
 
    val factory = Persistence.createEntityManagerFactory("order").unwrap(Mutiny.SessionFactory::class.java)
    sessionFactory.withSession { session ->
        session.persist(order5000).flatMap { session.persist(order3000) }
            .flatMap {
                val criteriaBuilder = sessionFactory.criteriaBuilder
                val query = criteriaBuilder.createQuery(Order::class.java)
                val from = query.from(Order::class.java)
                val group = from.fetch<Order, OrderGroup>(Order::groups.name)
                group.fetch<OrderGroup, OrderItem>(OrderGroup::items.name)
                group.fetch<OrderGroup, OrderAddress>(OrderGroup::address.name)
     
                session.createQuery(
                    query.select(from)
                        .where(criteriaBuilder.equal(from.get<Long>(Order::purchaserId.name), 5000L))
                    ).resultList
            }
    }.awaitSuspending()
 
    assertThat(orders).containsExactly(order5000)}
}

As you can see, Hibernate Reactive is quite difficult to use because of the inherent characteristics of the asynchronous code in Criteria API and Hibernate Reactive. Kotlin JDSL Reactive was designed to improve upon these shortcomings.

Conclusion

Until recently, I thought "JPA" and "reactive" were two concepts that couldn't coexist, but that is no longer the case. I look forward to the day when many of you will use Kotlin JDSL Reactive. There are many more examples and use cases documented on the Kotlin JDSL Reactive Github repository and more sample projects available on my personal GitHub repository, so please take a look if you're interested. Thank you for reading.