【Spring WebFlux】jOOQで組み立てたクエリをConnectionFactoryへ直に発行する【R2DBC】

jOOQで組み立てたクエリをConnectionFactoryへ直発行する仕組みを作ったのでまとめます。
なお、これは事情が有って作ったものであり、本来一切推奨できないコードであることはご承知おき下さい。

コード全体は以下の通りです。
基本的にはSelectクエリでの利用を想定しています。

import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Readable
import io.r2dbc.spi.Result
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.jooq.AttachableQueryPart
import org.jooq.Field
import org.jooq.JSONB
import org.jooq.Select
import org.jooq.conf.ParamType
import org.reactivestreams.Publisher
import org.springframework.r2dbc.connection.ConnectionHolder
import org.springframework.transaction.NoTransactionException
import org.springframework.transaction.reactive.TransactionSynchronizationManager
import reactor.core.publisher.Flux

// コネクションに対するSQL発行
private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> =
    createStatement(sql.getSQL(ParamType.INLINED)).execute()

// 呼び出し用
fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> {
    // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する
    val flux = TransactionSynchronizationManager
        .forCurrentTransaction()
        .flatMapMany { manager ->
            val con = (manager.getResource(this) as ConnectionHolder).connection
            con.query(sql)
        }
        .onErrorResume(NoTransactionException::class.java) {
            // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より
            // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started
            Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close)
        }
    // クエリで指定されている名前と列番号の対応を記録
    val context = sql.select.withIndex().associate { it.value to it.index }

    return flux
        .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } }
        .asFlow()
}

// 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している
class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) {
    // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外
    fun findIndex(field: Field<*>): Int = context[field]
        ?: (context.entries.singleOrNull { it.key.name == field.name }?.value)
        ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません")

    // 最低限よく使いそうな型に対しての変換定義
    @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST")
    fun <T> read(index: Int, clazz: Class<T>): T = when {
        clazz.isEnum -> readable.get(index, String::class.java)?.let { value ->
            (clazz as Class<Enum<*>>).enumConstants.find { it.name == value }
                ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした")
        }
        clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) }
        else -> readable.get(index, clazz)
    } as T

    /**
     * Recordからの読み出しを簡単化するための関数のReadable版
     *
     * jOOQ同様の読み出しができるとは限らないため注意!
     * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと
     */
    inline fun <reified T> read(field: Field<*>): T {
        val index = findIndex(field)
        val clazz = T::class.java

        return read(index, clazz)
    }

    /**
     * fieldと同じ型で読み出してよい場合
     */
    inline operator fun <reified T> get(field: Field<T>): T = read(field)
}

利用する様子は以下のようになります。

val cfi: ConnectionFactory = ...
val query: Select<*> = ...

val result: Flow<Dto> = cfi.read(sql) { it ->
    Dto(
        it.read(FOO.C1), // カラムの読み出し方1
        it[FOO.C2], // カラムの読み出し方2
    )
}

以下解説します。

クエリ発行部

諸事情でパラメータ埋め込みされた状態でのクエリ発行が必要だったため、ParamType.INLINEDを指定しています。
運用の仕方によってはクエリログ経由での情報漏洩など有りうるので注意が必要です。

それ以外は通常のクエリ発行と同様になっています。

// コネクションに対するSQL発行
private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> =
    createStatement(sql.getSQL(ParamType.INLINED)).execute()

呼び出し用インターフェース(+ コネクション取得)部

この部分では、コネクション取得と結果マッピングに向けた準備、及び実クエリ発行・結果マッピング呼び出しを行っています。
結果マッピングに向けた準備部分に関する説明は次セクションで行います。

// 呼び出し用
fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> {
    // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する
    val flux = TransactionSynchronizationManager
        .forCurrentTransaction()
        .flatMapMany { manager ->
            val con = (manager.getResource(this) as ConnectionHolder).connection
            con.query(sql)
        }
        .onErrorResume(NoTransactionException::class.java) {
            // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より
            // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started
            Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close)
        }
    // クエリで指定されている名前と列番号の対応を記録
    val context = sql.select.withIndex().associate { it.value to it.index }

    return flux
        .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } }
        .asFlow()
}

コネクション取得部

TransactionSynchronizationManager...からFlux.usingWhen...までの間の処理には、トランザクションの有効・無効での処理切り替えが入っています。
コネクションリークを防ぐため、トランザクション非開始(= NoTransactionException発生)の場合のみclose処理を入れています。

jOOQトランザクション関係の話は以下に関連記事があります。

qiita.com

結果型

ConnectionFactoryから返ってくる結果はReadableという型に格納されており、ここから結果を読み出すのは、jOOQを使う場合とかなり勝手が違います。
ContextualReadableは、この差を吸収するために定義した型です。

// 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している
class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) {
    // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外
    fun findIndex(field: Field<*>): Int = context[field]
        ?: (context.entries.singleOrNull { it.key.name == field.name }?.value)
        ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません")

    // 最低限よく使いそうな型に対しての変換定義
    @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST")
    fun <T> read(index: Int, clazz: Class<T>): T = when {
        clazz.isEnum -> readable.get(index, String::class.java)?.let { value ->
            (clazz as Class<Enum<*>>).enumConstants.find { it.name == value }
                ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした")
        }
        clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) }
        else -> readable.get(index, clazz)
    } as T

    /**
     * Recordからの読み出しを簡単化するための関数のReadable版
     *
     * jOOQ同様の読み出しができるとは限らないため注意!
     * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと
     */
    inline fun <reified T> read(field: Field<*>): T {
        val index = findIndex(field)
        val clazz = T::class.java

        return read(index, clazz)
    }

    /**
     * fieldと同じ型で読み出してよい場合
     */
    inline operator fun <reified T> get(field: Field<T>): T = read(field)
}

jOOQのクエリと対応付けたReadableからの読み出し部

Readableにはnameで読み出すメソッドとindexで読み出すメソッドが定義されていますが、Readable上のnamejOOQfieldから取れるnameは食い違う場合が有りました。

javadoc.io

そこで、クエリ上で指定されているfieldと指定順の対応をcontextで保持しておき、Readableからはindexで読み出す形としています。
前述のマッピングに向けた準備はここで利用されています。

    // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外
    fun findIndex(field: Field<*>): Int = context[field]
        ?: (context.entries.singleOrNull { it.key.name == field.name }?.value)
        ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません")

実読み出し部

ReadablejOOQと関係無く定義されているため、jOOQJSONB型への変換などは提供されていません。
そこで、よく使う系の型への変換を定義したのが以下です(もしかするとR2DBC側に何らかのコーデック登録ポイントは有るかも?)。

    // 最低限よく使いそうな型に対しての変換定義
    @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST")
    fun <T> read(index: Int, clazz: Class<T>): T = when {
        clazz.isEnum -> readable.get(index, String::class.java)?.let { value ->
            (clazz as Class<Enum<*>>).enumConstants.find { it.name == value }
                ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした")
        }
        clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) }
        else -> readable.get(index, clazz)
    } as T

呼び出し用関数類

呼び出し用関数は、jOOQのインターフェースに近づけたもの(get関数)と、独自定義関数に寄せたもの(read関数)との2通り用意しました。

    /**
     * Recordからの読み出しを簡単化するための関数のReadable版
     *
     * jOOQ同様の読み出しができるとは限らないため注意!
     * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと
     */
    inline fun <reified T> read(field: Field<*>): T {
        val index = findIndex(field)
        val clazz = T::class.java

        return read(index, clazz)
    }

    /**
     * fieldと同じ型で読み出してよい場合
     */
    inline operator fun <reified T> get(field: Field<T>): T = read(field)

read関数に関しては以下をご覧下さい。

wrongwrong163377.hatenablog.com

その他注意点

jOOQでクエリを組み立てる際、単にDSLを使うとDB毎の方言に対応していないクエリが生成されてしまうことがあります。
val QUERY_BUILDER: DSLContext = DSL.using(SQLDialect.POSTGRES, ...のように、元々使っているのと同じ設定のDSLContextを作って使い回すことをお勧めします。