【Spring】JDBC接続で、suspend関数に対してTransactionalアノテーションを利用してのロールバックが効くか確認する【Kotlin】

お詫び

過去公開していた記事では、Spring WebFluxで複数リクエストを同時に処理する場合を考慮していませんでした。
JDBC接続 x Spring WebFluxトランザクション管理をすると壊れる可能性が有るため、やらないことをお勧めします。

R2DBC接続でトランザクションが効かない!」という方は下記の記事が参考になるかもしれません。

qiita.com

何故壊れるのか

トランザクション管理にThreadLocalが利用されているためです。

github.com

Spring WebMvcでは、リクエスト毎にスレッドが生成されて利用されるため、ThreadLocalを使っても1リクエスト1スレッドで処理できます。
一方、Spring WebFluxでは、固定数のスレッドで全てのリクエストを処理します。
つまり、別リクエストの影響を受けて壊れる可能性が有るということです。

ThreadLocalの利用に関してはProject ReactorSpring WebFluxの基盤)のサイトでも言及があります。

projectreactor.io


TL;DR

  • 登録関数とそれを呼び出す関数がそれぞれsuspend関数か否かに関わらず、ロールバックは成功するようだった
    • H2Postgres両方でこの結果となった
  • ただし、呼び出し関数がsuspend関数で、coroutineScopeを切って登録関数を呼び出した際に不可解な挙動が生じたため、注意が必要

文脈

jOOQ/R2DBCロールバックできることが確認できたので、この文脈は読み飛ばして下さい。

qiita.com

自分が調査した限り、Spring 2.7.2で、jOOQ 3.16.4R2DBC接続で利用する場合、Transactionalアノテーションを利用してのロールバックは効きません。 この問題は以前のバージョンにも存在しています。

原因に関して調査したものの、明確な解決策を見つけることはできませんでした。 そもそも原因がjOOQなのかすら分かっていませんが、一応jOOQ 3.18に向けて幾つかの作業は進行中のようです。

github.com

jOOQは使いたい、でもTransacionalを使うならR2DBC接続にはできない……ということで、JDBC接続のjOOQsuspend関数から使った場合にどこまでトランザクションが効くか確認します。 この検証についてjOOQは多分あまり関係ないですが、一応自分の用途に合わせてjOOQを嚙ませています。

検証内容

検証に用いたプロジェクトは下記リポジトリに置いてあります。

github.com

検証内容としては、登録関数とそれを呼び出す関数がそれぞれsuspend関数か否かでTransactionalの挙動がどう変化するかを確認します。
確認方法は以下の通りです。

  1. テーブルに適当な値を登録し、登録が成功したことを確認する
  2. RuntimeExceptionthrowする(比較用にthrowしないパターンも用意する)
  3. レコードを取得してみて、ロールバックされているかを確認する

まず、登録関数と呼び出し関数のコードはそれぞれ以下の通りです。

// 登録関数
import org.jooq.DSLContext
import org.jooq.generated.tables.references.FOO_TABLE
import org.springframework.stereotype.Repository

@Repository
class Repository(private val create: DSLContext) {
    fun findAll() = create.selectFrom(FOO_TABLE).fetch()

    fun nonSuspendSave(value: String) = create.insertInto(FOO_TABLE).values(value).returning().fetchAnyInto(FOO_TABLE)

    suspend fun suspendSave(value: String) =
        create.insertInto(FOO_TABLE).values(value).returning().fetchAnyInto(FOO_TABLE)
}
// 呼び出し関数
import kotlinx.coroutines.runBlocking
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

object TestEx : RuntimeException("fail for test")

@Suppress("FunctionName")
@Component
@Transactional
class Caller(private val repository: Repository) {
    private fun getFuncName() = Throwable().stackTrace.let { it[1].methodName }

    // region 呼び出し元が非suspend
    fun nonSuspend_nonSuspend() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.nonSuspendSave(funcName)}")
    }

    fun nonSuspend_nonSuspend_fail() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.nonSuspendSave(funcName)}")
        throw TestEx
    }

    fun nonSuspend_suspend() {
        val funcName = getFuncName()
        println("$funcName on save\n${runBlocking { repository.suspendSave(funcName) }}")
    }

    fun nonSuspend_suspend_fail() {
        val funcName = getFuncName()
        println("$funcName on save\n${runBlocking { repository.suspendSave(funcName) }}")
        throw TestEx
    }
    // endregion

    // region 呼び出し元がsuspend
    suspend fun suspend_nonSuspend() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.nonSuspendSave(funcName)}")
    }

    suspend fun suspend_nonSuspend_fail() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.nonSuspendSave(funcName)}")
        throw TestEx
    }

    suspend fun suspend_suspend() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.suspendSave(funcName)}")
    }

    suspend fun suspend_suspend_fail() {
        val funcName = getFuncName()
        println("$funcName on save\n${repository.suspendSave(funcName)}")
        throw TestEx
    }
    // endregion
}

データベースはH2を使っています。

検証結果

以下のテストから呼び出して確認を行いました。

import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest

@SpringBootTest
private class CallerTest @Autowired constructor(
    val caller: Caller,
    val repository: Repository
) {
    private fun <T> exec(thrower: () -> T?): Any? = try { thrower() } catch (e: TestEx) { e }

    @BeforeEach
    fun callSaves() {
        exec { caller.nonSuspend_nonSuspend() }
        exec { caller.nonSuspend_nonSuspend_fail() }
        exec { caller.nonSuspend_suspend() }
        exec { caller.nonSuspend_suspend_fail() }

        exec { runBlocking { caller.suspend_nonSuspend() } }
        exec { runBlocking { caller.suspend_nonSuspend_fail() } }
        exec { runBlocking { caller.suspend_suspend() } }
        exec { runBlocking { caller.suspend_suspend_fail() } }
    }

    @Test
    fun print() {
        println("result:\n${repository.findAll()}")
    }
}

結果は以下のようになりました。
fail(= throwする)パターンは全てロールバックされていることが分かります。

result:
+------------------------------+
|TEXT_VALUE                    |
+------------------------------+
|nonSuspend_nonSuspend         |
|nonSuspend_suspend            |
|suspend_nonSuspend$suspendImpl|
|suspend_suspend$suspendImpl   |
+------------------------------+

懸念点が無いでもありませんが、一応普通にやっている間はロールバックされること確認できました。
また、コードは載せられませんが、PostgreSQLでやった場合にも同様の結果となりました。

補足: 呼び出し関数がsuspend関数で、coroutineScopeを切って登録関数を呼び出した際の不可解な挙動について

登録関数を以下のように変更した所、2件登録したレコードがどちらもロールバックされないという結果になりました。

suspend fun suspend_suspend_fail() {
    val funcName = getFuncName()
    println("$funcName on save\n${repository.suspendSave(funcName)}")
    coroutineScope {
        launch { println("$funcName on save2\n${repository.suspendSave(funcName + "2")}") }
    }.join()

    throw TestEx
}

coroutineScoperunBlockingで動かすなどすれば2件ともロールバックされているようでした(= 呼び出し関数が非suspend関数の場合にはrunBlockingせざるを得ないため、この現象は発生しない)。

coroutineScopeの内側はともかく、外側までロールバックされなくなるというのは少し直感に反する挙動に感じましたが、coroutineScopeを切って操作するというのがトランザクション処理と相性が悪そうなのはそうなので、やらないよう気を付ける必要が有るかなと思っています。