jOOQ
で組み立てたクエリをConnectionFactory
へ直発行する仕組みを作ったのでまとめます。
なお、これは事情が有って作ったものであり、本来一切推奨できないコードであることはご承知おき下さい。
コード全体は以下の通りです。
基本的にはSelect
クエリでの利用を想定しています。
import io.r2dbc.spi.Connection import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Readable import io.r2dbc.spi.Result import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.asFlow import org.jooq.AttachableQueryPart import org.jooq.Field import org.jooq.JSONB import org.jooq.Select import org.jooq.conf.ParamType import org.reactivestreams.Publisher import org.springframework.r2dbc.connection.ConnectionHolder import org.springframework.transaction.NoTransactionException import org.springframework.transaction.reactive.TransactionSynchronizationManager import reactor.core.publisher.Flux // コネクションに対するSQL発行 private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> = createStatement(sql.getSQL(ParamType.INLINED)).execute() // 呼び出し用 fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> { // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する val flux = TransactionSynchronizationManager .forCurrentTransaction() .flatMapMany { manager -> val con = (manager.getResource(this) as ConnectionHolder).connection con.query(sql) } .onErrorResume(NoTransactionException::class.java) { // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close) } // クエリで指定されている名前と列番号の対応を記録 val context = sql.select.withIndex().associate { it.value to it.index } return flux .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } } .asFlow() } // 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) { // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません") // 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T /** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field) }
利用する様子は以下のようになります。
val cfi: ConnectionFactory = ... val query: Select<*> = ... val result: Flow<Dto> = cfi.read(sql) { it -> Dto( it.read(FOO.C1), // カラムの読み出し方1 it[FOO.C2], // カラムの読み出し方2 ) }
以下解説します。
クエリ発行部
諸事情でパラメータ埋め込みされた状態でのクエリ発行が必要だったため、ParamType.INLINED
を指定しています。
運用の仕方によってはクエリログ経由での情報漏洩など有りうるので注意が必要です。
それ以外は通常のクエリ発行と同様になっています。
// コネクションに対するSQL発行 private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> = createStatement(sql.getSQL(ParamType.INLINED)).execute()
呼び出し用インターフェース(+ コネクション取得)部
この部分では、コネクション取得と結果マッピングに向けた準備、及び実クエリ発行・結果マッピング呼び出しを行っています。
結果マッピングに向けた準備部分に関する説明は次セクションで行います。
// 呼び出し用 fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> { // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する val flux = TransactionSynchronizationManager .forCurrentTransaction() .flatMapMany { manager -> val con = (manager.getResource(this) as ConnectionHolder).connection con.query(sql) } .onErrorResume(NoTransactionException::class.java) { // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close) } // クエリで指定されている名前と列番号の対応を記録 val context = sql.select.withIndex().associate { it.value to it.index } return flux .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } } .asFlow() }
コネクション取得部
TransactionSynchronizationManager...
からFlux.usingWhen...
までの間の処理には、トランザクションの有効・無効での処理切り替えが入っています。
コネクションリークを防ぐため、トランザクション非開始(= NoTransactionException
発生)の場合のみclose
処理を入れています。
jOOQ
とトランザクション関係の話は以下に関連記事があります。
結果型
ConnectionFactory
から返ってくる結果はReadable
という型に格納されており、ここから結果を読み出すのは、jOOQ
を使う場合とかなり勝手が違います。
ContextualReadable
は、この差を吸収するために定義した型です。
// 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) { // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません") // 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T /** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field) }
jOOQ
のクエリと対応付けたReadable
からの読み出し部
Readable
にはname
で読み出すメソッドとindex
で読み出すメソッドが定義されていますが、Readable
上のname
とjOOQ
のfield
から取れるname
は食い違う場合が有りました。
そこで、クエリ上で指定されているfield
と指定順の対応をcontext
で保持しておき、Readable
からはindex
で読み出す形としています。
前述のマッピングに向けた準備はここで利用されています。
// 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません")
実読み出し部
Readable
はjOOQ
と関係無く定義されているため、jOOQ
のJSONB
型への変換などは提供されていません。
そこで、よく使う系の型への変換を定義したのが以下です(もしかするとR2DBC
側に何らかのコーデック登録ポイントは有るかも?)。
// 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T
呼び出し用関数類
呼び出し用関数は、jOOQ
のインターフェースに近づけたもの(get
関数)と、独自定義関数に寄せたもの(read
関数)との2通り用意しました。
/** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field)
read
関数に関しては以下をご覧下さい。
wrongwrong163377.hatenablog.com
その他注意点
jOOQ
でクエリを組み立てる際、単にDSL
を使うとDB毎の方言に対応していないクエリが生成されてしまうことがあります。
val QUERY_BUILDER: DSLContext = DSL.using(SQLDialect.POSTGRES, ...
のように、元々使っているのと同じ設定のDSLContext
を作って使い回すことをお勧めします。