【jOOQ】R2DBC接続でDSLContextを上手く管理する方法を考える【Spring】

以下の記事を書いていて、「DSLContextを使い捨てたら非効率なのでは?」と感じたので、管理方法を考えてみたメモです。
その他の便利要件も含めています。

あくまでメモなので、これがどれ位性能に影響するか等は未検証です。

qiita.com


前提

ソースコード

管理用クラスは以下の通りです。

import io.r2dbc.spi.ConnectionFactory
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.ResultQuery
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import org.springframework.r2dbc.connection.ConnectionHolder
import org.springframework.stereotype.Component
import org.springframework.transaction.NoTransactionException
import org.springframework.transaction.reactive.TransactionSynchronizationManager
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono

@Component
class JooqContext(private val cfi: ConnectionFactory) {
    companion object {
        // TransactionSynchronizationManagerのリソースにDSLContextをバインドする際に用いるキー
        private const val DSL_CONTEXT_KEY = "DSLContext"
    }

    // 読み込み時にトランザクションが開始されていなければ使うDSLコンテキスト
    private val defaultContextForRead = DSL.using(cfi, SQLDialect.H2).dsl()

    // DSLContextは一々インスタンスを使い捨てるとオーバーヘッドが大きそうなため、コンテキストにバインドしてそれを使い回す
    private fun getDslContext(manager: TransactionSynchronizationManager): DSLContext {
        manager.getResource(DSL_CONTEXT_KEY)?.apply { return (this@apply as DSLContext) }

        val connection = (manager.getResource(cfi) as ConnectionHolder).connection
        val dslContext = DSL.using(connection, SQLDialect.H2).dsl()

        manager.bindResource(DSL_CONTEXT_KEY, dslContext)

        return dslContext
    }

    // テストでモックするため公開
    fun getCurrentDsl(): Mono<DSLContext> = TransactionSynchronizationManager
        .forCurrentTransaction() // forCurrentTransactionはトランザクション非開始でエラーが生じる
        .map { getDslContext(it) }

    // readは基本的に1件以上の取得が期待されるため、fluxを返す
    fun <R : Record> read(query: DSLContext.() -> ResultQuery<R>): Flow<R> = getCurrentDsl()
        // read時はトランザクション非開始でデフォルトにフォールバック
        .onErrorReturn(NoTransactionException::class.java, defaultContextForRead)
        .flatMapMany { query(it) }
        .asFlow()

    // write時はトランザクションを強制するため、トランザクション非開始エラーを処理しない
    suspend fun <R : Record> write(query: DSLContext.() -> ResultQuery<R>): R = getCurrentDsl()
        .flatMap { query(it).toMono() }
        .awaitSingle()

    fun <R : Record> writeMany(query: DSLContext.() -> ResultQuery<R>): Flow<R> = getCurrentDsl()
        .flatMapMany { query(it) }
        .asFlow()
}

利用側は以下のようになります。
DSLContextMono/Fluxの生成処理が管理クラス側に隠ぺいできた分だけ利用側がすっきりしています。

import org.jooq.generated.tables.records.FooTableRecord
import org.jooq.generated.tables.references.FOO_TABLE
import org.springframework.stereotype.Repository

@Repository
class Repository(private val ctxt: JooqContext) {
    suspend fun save(value: String): FooTableRecord {
        return ctxt.write { insertInto(FOO_TABLE).values(value).returning() }
            .map { it.into(FOO_TABLE) }
    }
}

解説

追記: ConnectionFactoryUtils.getConnectionを用いる際のコネクションリークについて

以下の解説ではConnectionFactoryUtils.getConnectionの利用について触れていますが、これを使う際には適切にcloseしなければコネクションリークが発生する点にご注意下さい。
詳しくは以下にまとめています。

qiita.com


コネクションやDSLContextの管理はTransactionSynchronizationManagerを直接利用するようにしています。
この形にしている理由は以下の通りです。

  • トランザクション無しで書き込もうとしたらエラーにしたい」を実現するのに都合がいい
  • TransactionSynchronizationManagerにリソースをバインドすることで、DSLContextを使い回せる

特にコネクション取得について、Qiitaの方ではConnectionFactoryUtils.getConnectionを用いていましたが、例外もしっかりケアされていたため、安全性だけ考えるとこちらの方が良いと思います。
この記事で紹介したコードはあくまで自分のユースケースでちゃんと動きそうという程度しか見ていません。 docs.spring.io