お詫び
過去公開していた記事では、Spring WebFlux
で複数リクエストを同時に処理する場合を考慮していませんでした。
JDBC
接続 x Spring WebFlux
でトランザクション管理をすると壊れる可能性が有るため、やらないことをお勧めします。
「R2DBC
接続でトランザクションが効かない!」という方は下記の記事が参考になるかもしれません。
何故壊れるのか
トランザクション管理にThreadLocal
が利用されているためです。
Spring WebMvc
では、リクエスト毎にスレッドが生成されて利用されるため、ThreadLocal
を使っても1リクエスト1スレッドで処理できます。
一方、Spring WebFlux
では、固定数のスレッドで全てのリクエストを処理します。
つまり、別リクエストの影響を受けて壊れる可能性が有るということです。
ThreadLocal
の利用に関してはProject Reactor
(Spring WebFlux
の基盤)のサイトでも言及があります。
TL;DR
- 登録関数とそれを呼び出す関数がそれぞれ
suspend
関数か否かに関わらず、ロールバックは成功するようだったH2
とPostgres
両方でこの結果となった
- ただし、呼び出し関数が
suspend
関数で、coroutineScope
を切って登録関数を呼び出した際に不可解な挙動が生じたため、注意が必要
文脈
jOOQ
/R2DBC
でロールバックできることが確認できたので、この文脈は読み飛ばして下さい。
自分が調査した限り、
Spring 2.7.2
で、jOOQ 3.16.4
をR2DBC
接続で利用する場合、Transactional
アノテーションを利用してのロールバックは効きません。 この問題は以前のバージョンにも存在しています。
原因に関して調査したものの、明確な解決策を見つけることはできませんでした。
そもそも原因がjOOQ
なのかすら分かっていませんが、一応jOOQ 3.18
に向けて幾つかの作業は進行中のようです。
jOOQ
は使いたい、でもTransacional
を使うならR2DBC
接続にはできない……ということで、JDBC
接続のjOOQ
をsuspend
関数から使った場合にどこまでトランザクションが効くか確認します。 この検証についてjOOQ
は多分あまり関係ないですが、一応自分の用途に合わせてjOOQ
を嚙ませています。
検証内容
検証に用いたプロジェクトは下記リポジトリに置いてあります。
検証内容としては、登録関数とそれを呼び出す関数がそれぞれsuspend
関数か否かでTransactional
の挙動がどう変化するかを確認します。
確認方法は以下の通りです。
- テーブルに適当な値を登録し、登録が成功したことを確認する
RuntimeException
をthrow
する(比較用にthrow
しないパターンも用意する)- レコードを取得してみて、ロールバックされているかを確認する
まず、登録関数と呼び出し関数のコードはそれぞれ以下の通りです。
// 登録関数 import org.jooq.DSLContext import org.jooq.generated.tables.references.FOO_TABLE import org.springframework.stereotype.Repository @Repository class Repository(private val create: DSLContext) { fun findAll() = create.selectFrom(FOO_TABLE).fetch() fun nonSuspendSave(value: String) = create.insertInto(FOO_TABLE).values(value).returning().fetchAnyInto(FOO_TABLE) suspend fun suspendSave(value: String) = create.insertInto(FOO_TABLE).values(value).returning().fetchAnyInto(FOO_TABLE) }
// 呼び出し関数 import kotlinx.coroutines.runBlocking import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional object TestEx : RuntimeException("fail for test") @Suppress("FunctionName") @Component @Transactional class Caller(private val repository: Repository) { private fun getFuncName() = Throwable().stackTrace.let { it[1].methodName } // region 呼び出し元が非suspend fun nonSuspend_nonSuspend() { val funcName = getFuncName() println("$funcName on save\n${repository.nonSuspendSave(funcName)}") } fun nonSuspend_nonSuspend_fail() { val funcName = getFuncName() println("$funcName on save\n${repository.nonSuspendSave(funcName)}") throw TestEx } fun nonSuspend_suspend() { val funcName = getFuncName() println("$funcName on save\n${runBlocking { repository.suspendSave(funcName) }}") } fun nonSuspend_suspend_fail() { val funcName = getFuncName() println("$funcName on save\n${runBlocking { repository.suspendSave(funcName) }}") throw TestEx } // endregion // region 呼び出し元がsuspend suspend fun suspend_nonSuspend() { val funcName = getFuncName() println("$funcName on save\n${repository.nonSuspendSave(funcName)}") } suspend fun suspend_nonSuspend_fail() { val funcName = getFuncName() println("$funcName on save\n${repository.nonSuspendSave(funcName)}") throw TestEx } suspend fun suspend_suspend() { val funcName = getFuncName() println("$funcName on save\n${repository.suspendSave(funcName)}") } suspend fun suspend_suspend_fail() { val funcName = getFuncName() println("$funcName on save\n${repository.suspendSave(funcName)}") throw TestEx } // endregion }
データベースはH2
を使っています。
検証結果
以下のテストから呼び出して確認を行いました。
import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest @SpringBootTest private class CallerTest @Autowired constructor( val caller: Caller, val repository: Repository ) { private fun <T> exec(thrower: () -> T?): Any? = try { thrower() } catch (e: TestEx) { e } @BeforeEach fun callSaves() { exec { caller.nonSuspend_nonSuspend() } exec { caller.nonSuspend_nonSuspend_fail() } exec { caller.nonSuspend_suspend() } exec { caller.nonSuspend_suspend_fail() } exec { runBlocking { caller.suspend_nonSuspend() } } exec { runBlocking { caller.suspend_nonSuspend_fail() } } exec { runBlocking { caller.suspend_suspend() } } exec { runBlocking { caller.suspend_suspend_fail() } } } @Test fun print() { println("result:\n${repository.findAll()}") } }
結果は以下のようになりました。
fail
(= throw
する)パターンは全てロールバックされていることが分かります。
result: +------------------------------+ |TEXT_VALUE | +------------------------------+ |nonSuspend_nonSuspend | |nonSuspend_suspend | |suspend_nonSuspend$suspendImpl| |suspend_suspend$suspendImpl | +------------------------------+
懸念点が無いでもありませんが、一応普通にやっている間はロールバックされること確認できました。
また、コードは載せられませんが、PostgreSQL
でやった場合にも同様の結果となりました。
補足: 呼び出し関数がsuspend
関数で、coroutineScope
を切って登録関数を呼び出した際の不可解な挙動について
登録関数を以下のように変更した所、2件登録したレコードがどちらもロールバックされないという結果になりました。
suspend fun suspend_suspend_fail() { val funcName = getFuncName() println("$funcName on save\n${repository.suspendSave(funcName)}") coroutineScope { launch { println("$funcName on save2\n${repository.suspendSave(funcName + "2")}") } }.join() throw TestEx }
coroutineScope
をrunBlocking
で動かすなどすれば2件ともロールバックされているようでした(= 呼び出し関数が非suspend
関数の場合にはrunBlocking
せざるを得ないため、この現象は発生しない)。
coroutineScope
の内側はともかく、外側までロールバックされなくなるというのは少し直感に反する挙動に感じましたが、coroutineScope
を切って操作するというのがトランザクション処理と相性が悪そうなのはそうなので、やらないよう気を付ける必要が有るかなと思っています。