From 056b5a37c66067b4b707dd04ba79ef0a9d500b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:35:22 +0100 Subject: [PATCH 01/15] Add testcontainers-kafka to version catalog --- gradle/libs.versions.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7ec0898..ff5bd36 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,6 +29,7 @@ jacksonDatatypeJsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datat liquibaseCore = { module = "org.liquibase:liquibase-core", version.ref = "liquibase" } testcontainersPostgresql = { module = "org.testcontainers:postgresql", version.ref = "testcontainers" } testcontainersMysql = { module = "org.testcontainers:mysql", version.ref = "testcontainers" } +testcontainersKafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers" } postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" } mysql = { module = "com.mysql:mysql-connector-j", version.ref = "mysql" } kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" } From 880372c95ee3d632e2624739e8f910f56ba908fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:35:27 +0100 Subject: [PATCH 02/15] Add okapi-integration-tests module skeleton --- okapi-integration-tests/build.gradle.kts | 46 ++++++++++++++++++++++++ settings.gradle.kts | 1 + 2 files changed, 47 insertions(+) create mode 100644 okapi-integration-tests/build.gradle.kts diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts new file mode 100644 index 0000000..c01b778 --- /dev/null +++ b/okapi-integration-tests/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + id("buildsrc.convention.kotlin-jvm") +} + +dependencies { + // Okapi modules under test + testImplementation(project(":okapi-core")) + testImplementation(project(":okapi-postgres")) + testImplementation(project(":okapi-mysql")) + testImplementation(project(":okapi-kafka")) + testImplementation(project(":okapi-http")) + testImplementation(project(":okapi-spring-boot")) + + // Test framework + testImplementation(libs.kotestRunnerJunit5) + testImplementation(libs.kotestAssertionsCore) + + // Testcontainers + testImplementation(libs.testcontainersPostgresql) + testImplementation(libs.testcontainersMysql) + testImplementation(libs.testcontainersKafka) + + // DB drivers + testImplementation(libs.postgresql) + testImplementation(libs.mysql) + + // Exposed ORM (for transaction blocks and DB queries in tests) + testImplementation(libs.exposedCore) + testImplementation(libs.exposedJdbc) + testImplementation(libs.exposedJson) + testImplementation(libs.exposedJavaTime) + + // Liquibase (schema migrations in tests) + testImplementation(libs.liquibaseCore) + + // Kafka clients (consumer verification in tests) + testImplementation(libs.kafkaClients) + + // WireMock (HTTP E2E tests) + testImplementation(libs.wiremock) + + // Spring (for E2E tests that may need Spring context) + testImplementation(libs.springContext) + testImplementation(libs.springTx) + testImplementation(libs.springBootAutoconfigure) +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 50ffe11..1e1b7d3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -16,5 +16,6 @@ include("okapi-http") include("okapi-kafka") include("okapi-spring-boot") include("okapi-bom") +include("okapi-integration-tests") rootProject.name = "okapi" From 25e5556d6eccda82818793260cd9f65e5ac30aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:38:16 +0100 Subject: [PATCH 03/15] Add test support helpers (Postgres, MySQL, Kafka, RecordingMessageDeliverer) --- .../okapi/test/support/KafkaTestSupport.kt | 39 ++++++++++++++++ .../okapi/test/support/MysqlTestSupport.kt | 40 +++++++++++++++++ .../okapi/test/support/PostgresTestSupport.kt | 40 +++++++++++++++++ .../test/support/RecordingMessageDeliverer.kt | 44 +++++++++++++++++++ 4 files changed, 163 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/RecordingMessageDeliverer.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt new file mode 100644 index 0000000..5339876 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt @@ -0,0 +1,39 @@ +package com.softwaremill.okapi.test.support + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.testcontainers.kafka.KafkaContainer + +class KafkaTestSupport { + val container = KafkaContainer("apache/kafka:3.9.0") + + fun start() { + container.start() + } + + fun stop() { + container.stop() + } + + fun createProducer(): KafkaProducer = KafkaProducer( + mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to container.bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java.name, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java.name, + ), + ) + + fun createConsumer(groupId: String = "test-group"): KafkaConsumer = KafkaConsumer( + mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to container.bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to groupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ), + ) +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt new file mode 100644 index 0000000..70944d8 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt @@ -0,0 +1,40 @@ +package com.softwaremill.okapi.test.support + +import liquibase.Liquibase +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection +import liquibase.resource.ClassLoaderResourceAccessor +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.testcontainers.containers.MySQLContainer +import java.sql.DriverManager + +class MysqlTestSupport { + val container = MySQLContainer("mysql:8.0") + + fun start() { + container.start() + Database.connect( + url = container.jdbcUrl, + driver = container.driverClassName, + user = container.username, + password = container.password, + ) + runLiquibase() + } + + fun stop() { + container.stop() + } + + fun truncate() { + transaction { exec("DELETE FROM outbox") } + } + + private fun runLiquibase() { + val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password) + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) + Liquibase("com/softwaremill/okapi/db/mysql/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } + connection.close() + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt new file mode 100644 index 0000000..3a54934 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt @@ -0,0 +1,40 @@ +package com.softwaremill.okapi.test.support + +import liquibase.Liquibase +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection +import liquibase.resource.ClassLoaderResourceAccessor +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.testcontainers.containers.PostgreSQLContainer +import java.sql.DriverManager + +class PostgresTestSupport { + val container = PostgreSQLContainer("postgres:16") + + fun start() { + container.start() + Database.connect( + url = container.jdbcUrl, + driver = container.driverClassName, + user = container.username, + password = container.password, + ) + runLiquibase() + } + + fun stop() { + container.stop() + } + + fun truncate() { + transaction { exec("TRUNCATE TABLE outbox") } + } + + private fun runLiquibase() { + val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password) + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) + Liquibase("com/softwaremill/okapi/db/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } + connection.close() + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/RecordingMessageDeliverer.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/RecordingMessageDeliverer.kt new file mode 100644 index 0000000..57ac50e --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/RecordingMessageDeliverer.kt @@ -0,0 +1,44 @@ +package com.softwaremill.okapi.test.support + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxId +import java.time.Instant +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap + +data class DeliveryRecord( + val entry: OutboxEntry, + val threadName: String, + val timestamp: Instant, +) + +class RecordingMessageDeliverer( + private val resultProvider: (OutboxEntry) -> DeliveryResult = { DeliveryResult.Success }, +) : MessageDeliverer { + override val type: String = "recording" + + private val _deliveries = ConcurrentHashMap>() + + override fun deliver(entry: OutboxEntry): DeliveryResult { + _deliveries.computeIfAbsent(entry.outboxId) { + Collections.synchronizedList(mutableListOf()) + }.add(DeliveryRecord(entry, Thread.currentThread().name, Instant.now())) + return resultProvider(entry) + } + + val deliveries: Map> get() = _deliveries.toMap() + + fun deliveryCount(): Int = _deliveries.size + + fun assertNoAmplification() { + val amplified = _deliveries.filter { (_, records) -> records.size > 1 } + check(amplified.isEmpty()) { + val details = amplified.entries.joinToString("\n") { (id, records) -> + " $id delivered ${records.size} times by: ${records.map { it.threadName }}" + } + "Delivery amplification detected:\n$details" + } + } +} From bbed894371f6d712c20d195df9f7f434a1ddcd59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:43:53 +0100 Subject: [PATCH 04/15] Add OutboxStore contract tests for Postgres --- .../test/store/OutboxStoreContractTests.kt | 234 ++++++++++++++++++ .../test/store/PostgresOutboxStoreTest.kt | 20 ++ 2 files changed, 254 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt new file mode 100644 index 0000000..def3963 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt @@ -0,0 +1,234 @@ +package com.softwaremill.okapi.test.store + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.maps.shouldContain +import io.kotest.matchers.shouldBe +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Instant + +private class StubDeliveryInfo( + override val type: String = "test", + private val metadata: String = """{"type":"test"}""", +) : DeliveryInfo { + override fun serialize(): String = metadata +} + +private fun createTestEntry( + now: Instant = Instant.parse("2024-01-01T00:00:00Z"), + messageType: String = "order.created", + payload: String = """{"orderId":"123"}""", + deliveryInfo: DeliveryInfo = StubDeliveryInfo(), +): OutboxEntry = OutboxEntry.createPending( + message = OutboxMessage(messageType = messageType, payload = payload), + deliveryInfo = deliveryInfo, + now = now, +) + +fun FunSpec.outboxStoreContractTests( + dbName: String, + storeFactory: () -> OutboxStore, + startDb: () -> Unit, + stopDb: () -> Unit, + truncate: () -> Unit, +) { + lateinit var store: OutboxStore + + beforeSpec { + startDb() + store = storeFactory() + } + + afterSpec { + stopDb() + } + + beforeEach { + truncate() + } + + test("[$dbName] persist and read back via claimPending") { + val entry = createTestEntry() + + transaction { store.persist(entry) } + + val claimed = transaction { store.claimPending(10) } + + claimed shouldHaveSize 1 + val found = claimed.first() + found.outboxId shouldBe entry.outboxId + found.messageType shouldBe entry.messageType + found.payload shouldBe entry.payload + found.status shouldBe OutboxStatus.PENDING + found.retries shouldBe 0 + found.deliveryType shouldBe "test" + found.deliveryMetadata.replace(" ", "") shouldBe """{"type":"test"}""" + } + + test("[$dbName] claimPending returns entries ordered by created_at ASC") { + val t1 = Instant.parse("2024-01-01T00:00:00Z") + val t2 = Instant.parse("2024-01-02T00:00:00Z") + val t3 = Instant.parse("2024-01-03T00:00:00Z") + + // Insert in non-sequential order to verify ordering + val e2 = createTestEntry(now = t2, messageType = "type.second") + val e3 = createTestEntry(now = t3, messageType = "type.third") + val e1 = createTestEntry(now = t1, messageType = "type.first") + + transaction { + store.persist(e2) + store.persist(e3) + store.persist(e1) + } + + val claimed = transaction { store.claimPending(10) } + + claimed shouldHaveSize 3 + claimed[0].messageType shouldBe "type.first" + claimed[1].messageType shouldBe "type.second" + claimed[2].messageType shouldBe "type.third" + } + + test("[$dbName] claimPending respects limit") { + transaction { + repeat(5) { i -> + val entry = createTestEntry( + now = Instant.parse("2024-01-01T00:00:00Z").plusSeconds(i.toLong()), + messageType = "type.$i", + ) + store.persist(entry) + } + } + + val claimed = transaction { store.claimPending(2) } + + claimed shouldHaveSize 2 + } + + test("[$dbName] claimPending ignores non-PENDING entries") { + val pendingEntry = createTestEntry( + now = Instant.parse("2024-01-01T00:00:00Z"), + messageType = "type.pending", + ) + val toBeDelivered = createTestEntry( + now = Instant.parse("2024-01-01T01:00:00Z"), + messageType = "type.delivered", + ) + + transaction { + store.persist(pendingEntry) + store.persist(toBeDelivered) + } + + // Claim the second entry and mark it delivered + transaction { + val claimed = store.claimPending(10) + val deliveredCandidate = claimed.first { it.outboxId == toBeDelivered.outboxId } + store.updateAfterProcessing(deliveredCandidate.toDelivered(Instant.parse("2024-01-02T00:00:00Z"))) + } + + val claimed = transaction { store.claimPending(10) } + + claimed shouldHaveSize 1 + claimed.first().messageType shouldBe "type.pending" + } + + test("[$dbName] updateAfterProcessing persists status change") { + val entry = createTestEntry() + + transaction { store.persist(entry) } + + transaction { + val claimed = store.claimPending(10) + val delivered = claimed.first().toDelivered(Instant.parse("2024-01-02T00:00:00Z")) + store.updateAfterProcessing(delivered) + } + + val counts = transaction { store.countByStatuses() } + + counts shouldContain (OutboxStatus.DELIVERED to 1L) + counts shouldContain (OutboxStatus.PENDING to 0L) + } + + test("[$dbName] removeDeliveredBefore deletes old delivered entries") { + val oldEntry = createTestEntry( + now = Instant.parse("2024-01-01T00:00:00Z"), + messageType = "type.old", + ) + val recentEntry = createTestEntry( + now = Instant.parse("2024-01-10T00:00:00Z"), + messageType = "type.recent", + ) + + transaction { + store.persist(oldEntry) + store.persist(recentEntry) + } + + // Mark both as delivered with different lastAttempt timestamps + transaction { + val claimed = store.claimPending(10) + val old = claimed.first { it.outboxId == oldEntry.outboxId } + val recent = claimed.first { it.outboxId == recentEntry.outboxId } + store.updateAfterProcessing(old.toDelivered(Instant.parse("2024-01-02T00:00:00Z"))) + store.updateAfterProcessing(recent.toDelivered(Instant.parse("2024-01-11T00:00:00Z"))) + } + + // Remove delivered before Jan 5 — should delete old (lastAttempt=Jan 2) but keep recent (lastAttempt=Jan 11) + transaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z")) } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.DELIVERED to 1L) + } + + test("[$dbName] countByStatuses returns correct counts") { + val pending1 = createTestEntry( + now = Instant.parse("2024-01-01T00:00:00Z"), + messageType = "type.pending1", + ) + val pending2 = createTestEntry( + now = Instant.parse("2024-01-01T01:00:00Z"), + messageType = "type.pending2", + ) + val toDeliver = createTestEntry( + now = Instant.parse("2024-01-01T02:00:00Z"), + messageType = "type.delivered", + ) + val toFail = createTestEntry( + now = Instant.parse("2024-01-01T03:00:00Z"), + messageType = "type.failed", + ) + + transaction { + store.persist(pending1) + store.persist(pending2) + store.persist(toDeliver) + store.persist(toFail) + } + + transaction { + val claimed = store.claimPending(10) + val deliverEntry = claimed.first { it.outboxId == toDeliver.outboxId } + val failEntry = claimed.first { it.outboxId == toFail.outboxId } + store.updateAfterProcessing(deliverEntry.toDelivered(Instant.parse("2024-01-02T00:00:00Z"))) + store.updateAfterProcessing(failEntry.toFailed(Instant.parse("2024-01-02T00:00:00Z"), "some error")) + } + + val counts = transaction { store.countByStatuses() } + + counts shouldContain (OutboxStatus.PENDING to 2L) + counts shouldContain (OutboxStatus.DELIVERED to 1L) + counts shouldContain (OutboxStatus.FAILED to 1L) + } + + test("[$dbName] claimPending returns empty when no PENDING entries") { + val claimed = transaction { store.claimPending(10) } + + claimed shouldHaveSize 0 + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt new file mode 100644 index 0000000..1c77bf1 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.test.store + +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +class PostgresOutboxStoreTest : FunSpec({ + val db = PostgresTestSupport() + + outboxStoreContractTests( + dbName = "postgres", + storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + startDb = { db.start() }, + stopDb = { db.stop() }, + truncate = { db.truncate() }, + ) +}) From 746aff5f7206b3a9edc43c112fdfd0f192d59672 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:43:57 +0100 Subject: [PATCH 05/15] Add OutboxStore contract tests for MySQL --- .../okapi/test/store/MysqlOutboxStoreTest.kt | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt new file mode 100644 index 0000000..c2d4c51 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.test.store + +import com.softwaremill.okapi.mysql.MysqlOutboxStore +import com.softwaremill.okapi.test.support.MysqlTestSupport +import io.kotest.core.spec.style.FunSpec +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +class MysqlOutboxStoreTest : FunSpec({ + val db = MysqlTestSupport() + + outboxStoreContractTests( + dbName = "mysql", + storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + startDb = { db.start() }, + stopDb = { db.stop() }, + truncate = { db.truncate() }, + ) +}) From 498dd70baa346644fbce07aa0ea57099022e0c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 12:14:36 +0100 Subject: [PATCH 06/15] Add concurrency tests (SKIP LOCKED) for Postgres Shared ConcurrentClaimTests extension function with two tests: - Deterministic: verifies disjoint claim sets when one processor holds locks while another claims concurrently - Realistic: 5 virtual threads process 50 entries with no delivery amplification Also adds (status, created_at) index migration for Postgres to improve claimPending query performance. --- .../test/concurrency/ConcurrentClaimTests.kt | 156 ++++++++++++++++++ .../PostgresConcurrentClaimTest.kt | 20 +++ .../okapi/db/004__add_claim_index.sql | 4 + .../com/softwaremill/okapi/db/changelog.xml | 1 + 4 files changed, 181 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt create mode 100644 okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt new file mode 100644 index 0000000..3500121 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt @@ -0,0 +1,156 @@ +package com.softwaremill.okapi.test.concurrency + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxId +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.test.support.RecordingMessageDeliverer +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.maps.shouldContain +import io.kotest.matchers.shouldBe +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.sql.Connection +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +private class StubDeliveryInfo( + override val type: String = "recording", + private val metadata: String = """{"type":"recording"}""", +) : DeliveryInfo { + override fun serialize(): String = metadata +} + +private fun createTestEntry(index: Int, now: Instant = Instant.parse("2024-01-01T00:00:00Z")): OutboxEntry = OutboxEntry.createPending( + message = OutboxMessage(messageType = "concurrent.test", payload = """{"index":$index}"""), + deliveryInfo = StubDeliveryInfo(), + now = now.plusSeconds(index.toLong()), +) + +fun FunSpec.concurrentClaimTests( + dbName: String, + storeFactory: () -> OutboxStore, + startDb: () -> Unit, + stopDb: () -> Unit, + truncate: () -> Unit, +) { + lateinit var store: OutboxStore + + beforeSpec { + startDb() + store = storeFactory() + } + + afterSpec { + stopDb() + } + + beforeEach { + truncate() + } + + test("[$dbName] concurrent claimPending with held locks produces disjoint sets") { + // Insert 20 entries + val allIds = transaction { + (0 until 20).map { i -> + val entry = createTestEntry(i) + store.persist(entry) + entry.outboxId + } + } + + val lockAcquired = CountDownLatch(1) + val canCommit = CountDownLatch(1) + val claimedByA = CompletableFuture>() + + // Thread A: claim entries and hold the transaction open. + // READ_COMMITTED is required for MySQL — under REPEATABLE_READ, InnoDB's + // next-key locks cause SKIP LOCKED to skip more rows than actually locked. + val threadA = Thread.ofVirtual().name("processor-A").start { + try { + transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + val claimed = store.claimPending(10) + claimedByA.complete(claimed.map { it.outboxId }) + lockAcquired.countDown() + // Hold locks open until main thread signals + canCommit.await(10, TimeUnit.SECONDS) + } + } catch (e: Exception) { + claimedByA.completeExceptionally(e) + } + } + + // Wait for Thread A to acquire locks + lockAcquired.await(10, TimeUnit.SECONDS) shouldBe true + + // Main thread: claim remaining entries (SKIP LOCKED should skip A's locked rows) + val idsA = claimedByA.get(10, TimeUnit.SECONDS) + val idsB = transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + store.claimPending(10) + }.map { it.outboxId } + + // Assert disjoint + val intersection = idsA.toSet().intersect(idsB.toSet()) + intersection shouldHaveSize 0 + + // Together they cover all 20 entries + val union = (idsA + idsB).toSet() + union shouldHaveSize 20 + union shouldBe allIds.toSet() + + // Let Thread A commit and finish + canCommit.countDown() + threadA.join(10_000) + } + + test("[$dbName] concurrent processors cause no delivery amplification") { + val fixedClock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC) + + // Insert 50 entries + transaction { + (0 until 50).forEach { i -> store.persist(createTestEntry(i)) } + } + + val recorder = RecordingMessageDeliverer() + val entryProcessor = OutboxEntryProcessor(recorder, RetryPolicy(maxRetries = 0), fixedClock) + + val barrier = CyclicBarrier(5) + val executor = Executors.newVirtualThreadPerTaskExecutor() + + val futures = (1..5).map { + CompletableFuture.supplyAsync( + { + barrier.await(10, TimeUnit.SECONDS) + transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + OutboxProcessor(store, entryProcessor).processNext(limit = 50) + } + }, + executor, + ) + } + + // Wait for all threads to complete + CompletableFuture.allOf(*futures.toTypedArray()).get(30, TimeUnit.SECONDS) + executor.shutdown() + + // Verify no amplification + recorder.assertNoAmplification() + recorder.deliveryCount() shouldBe 50 + + // Verify DB state + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.DELIVERED to 50L) + counts shouldContain (OutboxStatus.PENDING to 0L) + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt new file mode 100644 index 0000000..b4b1c96 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.test.concurrency + +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +class PostgresConcurrentClaimTest : FunSpec({ + val db = PostgresTestSupport() + + concurrentClaimTests( + dbName = "postgres", + storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + startDb = { db.start() }, + stopDb = { db.stop() }, + truncate = { db.truncate() }, + ) +}) diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql new file mode 100644 index 0000000..9925c9a --- /dev/null +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:003 + +CREATE INDEX IF NOT EXISTS idx_outbox_status_created_at ON outbox (status, created_at); diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml index d0497e3..4ccc4e9 100644 --- a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml @@ -6,4 +6,5 @@ + From 3706b82ca78234a0809bac1522d0d4806065b4d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 12:14:48 +0100 Subject: [PATCH 07/15] Add concurrency tests (SKIP LOCKED) for MySQL Adds MysqlConcurrentClaimTest reusing the shared test suite. Fixes MySQL claimPending to use FORCE INDEX on (status, created_at) so that FOR UPDATE SKIP LOCKED only row-locks the rows returned by LIMIT. Without the index hint, InnoDB locks all rows matching the WHERE clause during a full table scan, preventing concurrent processors from claiming disjoint sets. Changes: - Add (status, created_at) index to OutboxTable and Liquibase migration - Add FORCE INDEX hint to MysqlOutboxStore.claimPending() --- .../concurrency/MysqlConcurrentClaimTest.kt | 20 +++++++++++++++++++ .../okapi/mysql/MysqlOutboxStore.kt | 4 ++++ .../softwaremill/okapi/mysql/OutboxTable.kt | 4 ++++ .../okapi/db/mysql/003__add_claim_index.sql | 4 ++++ .../softwaremill/okapi/db/mysql/changelog.xml | 1 + 5 files changed, 33 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt create mode 100644 okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt new file mode 100644 index 0000000..7ac7287 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.test.concurrency + +import com.softwaremill.okapi.mysql.MysqlOutboxStore +import com.softwaremill.okapi.test.support.MysqlTestSupport +import io.kotest.core.spec.style.FunSpec +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +class MysqlConcurrentClaimTest : FunSpec({ + val db = MysqlTestSupport() + + concurrentClaimTests( + dbName = "mysql", + storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + startDb = { db.start() }, + stopDb = { db.stop() }, + truncate = { db.truncate() }, + ) +}) diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 85cef79..424f938 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -39,8 +39,12 @@ class MysqlOutboxStore( } override fun claimPending(limit: Int): List { + // FORCE INDEX ensures InnoDB walks the (status, created_at) index so + // that FOR UPDATE SKIP LOCKED only row-locks the rows actually returned + // by LIMIT, rather than every row matching the WHERE clause. val nativeQuery = "SELECT * FROM ${OutboxTable.tableName}" + + " FORCE INDEX (idx_outbox_status_created_at)" + " WHERE ${OutboxTable.status.name} = '${OutboxStatus.PENDING}'" + " ORDER BY ${OutboxTable.createdAt.name} ASC" + " LIMIT $limit FOR UPDATE SKIP LOCKED" diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt index 4aacee7..a525706 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt @@ -24,4 +24,8 @@ internal object OutboxTable : Table("outbox") { val deliveryMetadata = json("delivery_metadata", { it }, { it }) override val primaryKey = PrimaryKey(id) + + init { + index("idx_outbox_status_created_at", isUnique = false, status, createdAt) + } } diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql new file mode 100644 index 0000000..53e6856 --- /dev/null +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:002 + +CREATE INDEX idx_outbox_status_created_at ON outbox (status, created_at); diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml index fe5d7b0..126d56b 100644 --- a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml @@ -5,4 +5,5 @@ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + From ebde3147f71ebb4d1686aed8062b5bb149130c77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 12:56:17 +0100 Subject: [PATCH 08/15] Add Kafka transport integration tests with Testcontainers --- okapi-integration-tests/build.gradle.kts | 3 + .../okapi/test/support/KafkaTestSupport.kt | 2 +- .../KafkaTransportIntegrationTest.kt | 124 ++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index c01b778..115de1a 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -36,6 +36,9 @@ dependencies { // Kafka clients (consumer verification in tests) testImplementation(libs.kafkaClients) + // SLF4J for Testcontainers logging + testRuntimeOnly("org.slf4j:slf4j-simple:2.0.13") + // WireMock (HTTP E2E tests) testImplementation(libs.wiremock) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt index 5339876..209d555 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/KafkaTestSupport.kt @@ -9,7 +9,7 @@ import org.apache.kafka.common.serialization.StringSerializer import org.testcontainers.kafka.KafkaContainer class KafkaTestSupport { - val container = KafkaContainer("apache/kafka:3.9.0") + val container = KafkaContainer("apache/kafka:3.8.1") fun start() { container.start() diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt new file mode 100644 index 0000000..d591cf9 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt @@ -0,0 +1,124 @@ +package com.softwaremill.okapi.test.transport + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.kafka.KafkaDeliveryInfo +import com.softwaremill.okapi.kafka.KafkaMessageDeliverer +import com.softwaremill.okapi.kafka.kafkaDeliveryInfo +import com.softwaremill.okapi.test.support.KafkaTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import org.apache.kafka.clients.producer.KafkaProducer +import java.time.Duration +import java.time.Instant +import java.util.UUID + +class KafkaTransportIntegrationTest : FunSpec({ + val kafka = KafkaTestSupport() + var producer: KafkaProducer? = null + lateinit var deliverer: KafkaMessageDeliverer + + beforeSpec { + kafka.start() + producer = kafka.createProducer() + deliverer = KafkaMessageDeliverer(producer!!) + } + + afterSpec { + producer?.close() + kafka.stop() + } + + fun entryWithInfo( + topic: String, + payload: String = """{"orderId":"abc-123"}""", + partitionKey: String? = null, + headers: Map = emptyMap(), + ): OutboxEntry { + val info = kafkaDeliveryInfo { + this.topic = topic + this.partitionKey = partitionKey + headers.forEach { (k, v) -> header(k, v) } + } + return OutboxEntry.createPending( + message = OutboxMessage(messageType = "test.event", payload = payload), + deliveryInfo = info, + now = Instant.now(), + ) + } + + test("deliver sends message to correct topic") { + val entry = entryWithInfo(topic = "orders") + deliverer.deliver(entry) + + val consumer = kafka.createConsumer(groupId = "test-topic-${UUID.randomUUID()}") + consumer.subscribe(listOf("orders")) + val records = consumer.poll(Duration.ofSeconds(10)) + consumer.close() + + records.count() shouldBe 1 + val record = records.first() + record.topic() shouldBe "orders" + record.value() shouldBe """{"orderId":"abc-123"}""" + } + + test("deliver preserves headers") { + val entry = entryWithInfo( + topic = "header-topic-${UUID.randomUUID()}", + headers = mapOf("traceId" to "trace-abc", "source" to "okapi"), + ) + deliverer.deliver(entry) + + val consumer = kafka.createConsumer(groupId = "test-headers-${UUID.randomUUID()}") + consumer.subscribe(listOf(entry.let { KafkaDeliveryInfo.deserialize(it.deliveryMetadata).topic })) + val records = consumer.poll(Duration.ofSeconds(10)) + consumer.close() + + records.count() shouldBe 1 + val record = records.first() + val headerMap = record.headers().associate { it.key() to String(it.value()) } + headerMap["traceId"] shouldBe "trace-abc" + headerMap["source"] shouldBe "okapi" + } + + test("deliver uses partition key") { + val entry = entryWithInfo( + topic = "key-topic-${UUID.randomUUID()}", + partitionKey = "user-42", + ) + deliverer.deliver(entry) + + val consumer = kafka.createConsumer(groupId = "test-key-${UUID.randomUUID()}") + consumer.subscribe(listOf(entry.let { KafkaDeliveryInfo.deserialize(it.deliveryMetadata).topic })) + val records = consumer.poll(Duration.ofSeconds(10)) + consumer.close() + + records.count() shouldBe 1 + records.first().key() shouldBe "user-42" + } + + test("deliver without partition key sends null key") { + val entry = entryWithInfo( + topic = "nullkey-topic-${UUID.randomUUID()}", + partitionKey = null, + ) + deliverer.deliver(entry) + + val consumer = kafka.createConsumer(groupId = "test-nullkey-${UUID.randomUUID()}") + consumer.subscribe(listOf(entry.let { KafkaDeliveryInfo.deserialize(it.deliveryMetadata).topic })) + val records = consumer.poll(Duration.ofSeconds(10)) + consumer.close() + + records.count() shouldBe 1 + records.first().key().shouldBeNull() + } + + test("deliver returns Success on successful send") { + val entry = entryWithInfo(topic = "success-topic-${UUID.randomUUID()}") + val result = deliverer.deliver(entry) + + result shouldBe DeliveryResult.Success + } +}) From 7c70520d0c1382f6716c58b321b96435e7dc2e30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 13:02:35 +0100 Subject: [PATCH 09/15] Add refactored HTTP E2E tests (Postgres, FunSpec) --- .../okapi/test/e2e/HttpEndToEndTest.kt | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt new file mode 100644 index 0000000..d6832ba --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt @@ -0,0 +1,170 @@ +package com.softwaremill.okapi.test.e2e + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.equalTo +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.github.tomakehurst.wiremock.http.Fault +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.http.HttpMessageDeliverer +import com.softwaremill.okapi.http.ServiceUrlResolver +import com.softwaremill.okapi.http.httpDeliveryInfo +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.maps.shouldContain +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Clock + +class HttpEndToEndTest : FunSpec({ + val db = PostgresTestSupport() + val wiremock = WireMockServer(wireMockConfig().dynamicPort()) + + beforeSpec { + db.start() + wiremock.start() + } + + afterSpec { + wiremock.stop() + db.stop() + } + + beforeEach { + wiremock.resetAll() + db.truncate() + } + + fun buildPipeline(maxRetries: Int = 3): Triple { + val clock = Clock.systemUTC() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor( + HttpMessageDeliverer(urlResolver), + RetryPolicy(maxRetries = maxRetries), + clock, + ) + return Triple(publisher, OutboxProcessor(store, entryProcessor), store) + } + + fun deliveryInfo() = httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + } + + test("HTTP 200 - message delivered, payload matches") { + val (publisher, processor, store) = buildPipeline() + val payload = """{"orderId":"abc-123"}""" + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(200)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + transaction { processor.processNext() } + + wiremock.verify( + postRequestedFor(urlEqualTo("/api/notify")) + .withRequestBody(equalTo(payload)), + ) + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.DELIVERED to 1L) + } + + test("HTTP 500 - retriable failure, stays PENDING") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(500)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 1L) + counts shouldContain (OutboxStatus.DELIVERED to 0L) + } + + test("HTTP 400 - permanent failure, immediately FAILED") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(400)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.FAILED to 1L) + counts shouldContain (OutboxStatus.PENDING to 0L) + } + + test("connection reset - retriable, stays PENDING") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 1L) + } + + test("transaction rollback - entry not persisted") { + val (publisher, _, store) = buildPipeline() + + runCatching { + transaction { + publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) + error("Simulated business logic failure") + } + } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 0L) + } + + test("retry exhaustion - retries then FAILED") { + val (publisher, processor, store) = buildPipeline(maxRetries = 3) + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(500)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + + // First 3 processNext calls: retries 0->1, 1->2, 2->3 — stays PENDING + repeat(3) { + transaction { processor.processNext() } + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 1L) + } + + // 4th processNext: retries==3, shouldRetry(3) returns false -> FAILED + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.FAILED to 1L) + counts shouldContain (OutboxStatus.PENDING to 0L) + } +}) From 9ce52ea7ca16b905d4502e3b3fe6721360d7824f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 13:02:40 +0100 Subject: [PATCH 10/15] Add MySQL HTTP E2E tests (FunSpec) --- .../okapi/test/e2e/MysqlHttpEndToEndTest.kt | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt new file mode 100644 index 0000000..99d6d22 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt @@ -0,0 +1,131 @@ +package com.softwaremill.okapi.test.e2e + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.equalTo +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.github.tomakehurst.wiremock.http.Fault +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.http.HttpMessageDeliverer +import com.softwaremill.okapi.http.ServiceUrlResolver +import com.softwaremill.okapi.http.httpDeliveryInfo +import com.softwaremill.okapi.mysql.MysqlOutboxStore +import com.softwaremill.okapi.test.support.MysqlTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.maps.shouldContain +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Clock + +class MysqlHttpEndToEndTest : FunSpec({ + val db = MysqlTestSupport() + val wiremock = WireMockServer(wireMockConfig().dynamicPort()) + + beforeSpec { + db.start() + wiremock.start() + } + + afterSpec { + wiremock.stop() + db.stop() + } + + beforeEach { + wiremock.resetAll() + db.truncate() + } + + fun buildPipeline(): Triple { + val clock = Clock.systemUTC() + val store = MysqlOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor( + HttpMessageDeliverer(urlResolver), + RetryPolicy(maxRetries = 3), + clock, + ) + return Triple(publisher, OutboxProcessor(store, entryProcessor), store) + } + + fun deliveryInfo() = httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + } + + test("HTTP 200 - message delivered") { + val (publisher, processor, store) = buildPipeline() + val payload = """{"orderId":"abc-123"}""" + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(200)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + transaction { processor.processNext() } + + wiremock.verify( + postRequestedFor(urlEqualTo("/api/notify")) + .withRequestBody(equalTo(payload)), + ) + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.DELIVERED to 1L) + } + + test("HTTP 500 - retriable, PENDING") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(500)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 1L) + counts shouldContain (OutboxStatus.DELIVERED to 0L) + } + + test("HTTP 400 - permanent, FAILED") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(400)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.FAILED to 1L) + counts shouldContain (OutboxStatus.PENDING to 0L) + } + + test("connection reset - retriable, PENDING") { + val (publisher, processor, store) = buildPipeline() + + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.PENDING to 1L) + } +}) From b1d4f3b591ca99ee28f34344e9533f80e5178727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 13:02:44 +0100 Subject: [PATCH 11/15] Add Kafka E2E test (Postgres + Kafka full pipeline) --- .../okapi/test/e2e/KafkaEndToEndTest.kt | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt new file mode 100644 index 0000000..02bee9f --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt @@ -0,0 +1,75 @@ +package com.softwaremill.okapi.test.e2e + +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.kafka.KafkaMessageDeliverer +import com.softwaremill.okapi.kafka.kafkaDeliveryInfo +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.KafkaTestSupport +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.maps.shouldContain +import io.kotest.matchers.shouldBe +import org.apache.kafka.clients.producer.KafkaProducer +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Clock +import java.time.Duration +import java.util.UUID + +class KafkaEndToEndTest : FunSpec({ + val db = PostgresTestSupport() + val kafka = KafkaTestSupport() + var producer: KafkaProducer? = null + + beforeSpec { + db.start() + kafka.start() + producer = kafka.createProducer() + } + + afterSpec { + producer?.close() + kafka.stop() + db.stop() + } + + beforeEach { + db.truncate() + } + + test("full pipeline: publish to outbox -> processNext -> message on Kafka topic") { + val clock = Clock.systemUTC() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val deliverer = KafkaMessageDeliverer(producer!!) + val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor) + + val topic = "orders-${UUID.randomUUID()}" + val payload = """{"orderId":"order-42"}""" + val info = kafkaDeliveryInfo { + this.topic = topic + partitionKey = "user-1" + } + + transaction { publisher.publish(OutboxMessage("order.created", payload), info) } + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + counts shouldContain (OutboxStatus.DELIVERED to 1L) + + val consumer = kafka.createConsumer(groupId = "e2e-test-${UUID.randomUUID()}") + consumer.subscribe(listOf(topic)) + val records = consumer.poll(Duration.ofSeconds(10)) + consumer.close() + + records.count() shouldBe 1 + val record = records.first() + record.value() shouldBe payload + record.key() shouldBe "user-1" + } +}) From 52a4094558cd673af0168b0a56dab6e1dac18b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 13:06:41 +0100 Subject: [PATCH 12/15] Remove old E2E tests, clean up test dependencies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrated to okapi-integration-tests module: - OutboxEndToEndTest (Postgres+HTTP) → HttpEndToEndTest - MysqlOutboxEndToEndTest (MySQL+HTTP) → MysqlHttpEndToEndTest Removed Testcontainers/WireMock/Liquibase test deps from okapi-spring-boot and okapi-mysql modules. --- okapi-mysql/build.gradle.kts | 3 - .../okapi/mysql/MysqlOutboxEndToEndTest.kt | 195 ------------------ okapi-spring-boot/build.gradle.kts | 2 +- .../okapi/springboot/OutboxEndToEndTest.kt | 192 ----------------- 4 files changed, 1 insertion(+), 391 deletions(-) delete mode 100644 okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxEndToEndTest.kt delete mode 100644 okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxEndToEndTest.kt diff --git a/okapi-mysql/build.gradle.kts b/okapi-mysql/build.gradle.kts index e10a089..e8626ed 100644 --- a/okapi-mysql/build.gradle.kts +++ b/okapi-mysql/build.gradle.kts @@ -23,7 +23,4 @@ dependencies { testImplementation(libs.exposedJdbc) testImplementation(libs.exposedJson) testImplementation(libs.exposedJavaTime) - testImplementation(libs.liquibaseCore) - testImplementation(libs.wiremock) - testImplementation(project(":okapi-http")) } diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxEndToEndTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxEndToEndTest.kt deleted file mode 100644 index 412e47f..0000000 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxEndToEndTest.kt +++ /dev/null @@ -1,195 +0,0 @@ -package com.softwaremill.okapi.mysql - -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock.aResponse -import com.github.tomakehurst.wiremock.client.WireMock.post -import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor -import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo -import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig -import com.softwaremill.okapi.core.OutboxEntryProcessor -import com.softwaremill.okapi.core.OutboxMessage -import com.softwaremill.okapi.core.OutboxProcessor -import com.softwaremill.okapi.core.OutboxPublisher -import com.softwaremill.okapi.core.OutboxStatus -import com.softwaremill.okapi.core.RetryPolicy -import com.softwaremill.okapi.http.HttpMessageDeliverer -import com.softwaremill.okapi.http.ServiceUrlResolver -import com.softwaremill.okapi.http.httpDeliveryInfo -import io.kotest.core.spec.style.BehaviorSpec -import io.kotest.matchers.shouldBe -import liquibase.Liquibase -import liquibase.database.DatabaseFactory -import liquibase.database.jvm.JdbcConnection -import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction -import org.testcontainers.containers.MySQLContainer -import java.sql.DriverManager -import java.time.Clock - -class MysqlOutboxEndToEndTest : - BehaviorSpec({ - val mysql = MySQLContainer("mysql:8.0") - val wiremock = WireMockServer(wireMockConfig().dynamicPort()) - - lateinit var store: MysqlOutboxStore - lateinit var publisher: OutboxPublisher - lateinit var processor: OutboxProcessor - - beforeSpec { - mysql.start() - wiremock.start() - - Database.connect( - url = mysql.jdbcUrl, - driver = mysql.driverClassName, - user = mysql.username, - password = mysql.password, - ) - - val connection = DriverManager.getConnection(mysql.jdbcUrl, mysql.username, mysql.password) - val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) - Liquibase("com/softwaremill/okapi/db/mysql/changelog.xml", ClassLoaderResourceAccessor(), db) - .use { it.update("") } - connection.close() - - val clock = Clock.systemUTC() - store = MysqlOutboxStore(clock) - publisher = OutboxPublisher(store, clock) - - val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } - val deliverer = HttpMessageDeliverer(urlResolver) - val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) - processor = OutboxProcessor(store, entryProcessor) - } - - afterSpec { - wiremock.stop() - mysql.stop() - } - - beforeEach { - wiremock.resetAll() - transaction { exec("DELETE FROM outbox") } - } - - given("a message published within a transaction") { - `when`("the HTTP endpoint returns 200") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(200)), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"abc-123"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify"))) - val counts = transaction { store.countByStatuses() } - - then("WireMock receives exactly one POST request") { - requests.size shouldBe 1 - } - then("request body matches the published payload") { - requests.first().bodyAsString shouldBe """{"orderId":"abc-123"}""" - } - then("entry is marked as DELIVERED") { - counts[OutboxStatus.DELIVERED] shouldBe 1L - } - } - - `when`("the HTTP endpoint returns 500") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(500).withBody("Internal Server Error")), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"xyz-456"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry stays PENDING (retriable failure, retries remaining)") { - counts[OutboxStatus.PENDING] shouldBe 1L - } - then("no DELIVERED entries") { - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - - `when`("the HTTP endpoint returns 400") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(400).withBody("Bad Request")), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"err-789"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry is immediately FAILED (permanent failure)") { - counts[OutboxStatus.FAILED] shouldBe 1L - } - then("no PENDING or DELIVERED entries") { - counts[OutboxStatus.PENDING] shouldBe 0L - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - - `when`("the endpoint is unreachable") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn( - aResponse().withFault( - com.github.tomakehurst.wiremock.http.Fault.CONNECTION_RESET_BY_PEER, - ), - ), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"net-000"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry stays PENDING (retriable network failure)") { - counts[OutboxStatus.PENDING] shouldBe 1L - } - } - } - }) diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 584286e..78e229a 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -21,13 +21,13 @@ dependencies { testImplementation(libs.kotestAssertionsCore) testImplementation(libs.springContext) testImplementation(libs.springTx) - testImplementation(libs.exposedCore) testImplementation(libs.springBootAutoconfigure) testImplementation(libs.springBootTest) testImplementation(libs.assertjCore) testImplementation(project(":okapi-postgres")) testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) + testImplementation(libs.exposedCore) testImplementation(libs.exposedJdbc) testImplementation(libs.exposedJson) testImplementation(libs.exposedJavaTime) diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxEndToEndTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxEndToEndTest.kt deleted file mode 100644 index 972d5db..0000000 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxEndToEndTest.kt +++ /dev/null @@ -1,192 +0,0 @@ -package com.softwaremill.okapi.springboot - -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock.aResponse -import com.github.tomakehurst.wiremock.client.WireMock.post -import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor -import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo -import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig -import com.softwaremill.okapi.core.OutboxEntryProcessor -import com.softwaremill.okapi.core.OutboxMessage -import com.softwaremill.okapi.core.OutboxProcessor -import com.softwaremill.okapi.core.OutboxPublisher -import com.softwaremill.okapi.core.OutboxStatus -import com.softwaremill.okapi.core.RetryPolicy -import com.softwaremill.okapi.http.HttpMessageDeliverer -import com.softwaremill.okapi.http.ServiceUrlResolver -import com.softwaremill.okapi.http.httpDeliveryInfo -import com.softwaremill.okapi.postgres.PostgresOutboxStore -import io.kotest.core.spec.style.BehaviorSpec -import io.kotest.matchers.shouldBe -import liquibase.Liquibase -import liquibase.database.DatabaseFactory -import liquibase.database.jvm.JdbcConnection -import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction -import org.testcontainers.containers.PostgreSQLContainer -import java.sql.DriverManager -import java.time.Clock - -class OutboxEndToEndTest : - BehaviorSpec({ - val postgres = PostgreSQLContainer("postgres:16") - val wiremock = WireMockServer(wireMockConfig().dynamicPort()) - - lateinit var store: PostgresOutboxStore - lateinit var publisher: OutboxPublisher - lateinit var processor: OutboxProcessor - - beforeSpec { - postgres.start() - wiremock.start() - - Database.connect( - url = postgres.jdbcUrl, - driver = postgres.driverClassName, - user = postgres.username, - password = postgres.password, - ) - - val connection = DriverManager.getConnection(postgres.jdbcUrl, postgres.username, postgres.password) - val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) - Liquibase("com/softwaremill/okapi/db/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } - connection.close() - - val clock = Clock.systemUTC() - store = PostgresOutboxStore(clock) - publisher = OutboxPublisher(store, clock) - - val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } - val deliverer = HttpMessageDeliverer(urlResolver) - val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) - processor = OutboxProcessor(store, entryProcessor) - } - - afterSpec { - wiremock.stop() - postgres.stop() - } - - beforeEach { - wiremock.resetAll() - transaction { exec("TRUNCATE TABLE outbox") } - } - - given("a message published within a transaction") { - `when`("the HTTP endpoint returns 200") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(200)), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"abc-123"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify"))) - val counts = transaction { store.countByStatuses() } - - then("WireMock receives exactly one POST request") { - requests.size shouldBe 1 - } - then("request body matches the published payload") { - requests.first().bodyAsString shouldBe """{"orderId":"abc-123"}""" - } - then("entry is marked as DELIVERED") { - counts[OutboxStatus.DELIVERED] shouldBe 1L - } - } - - `when`("the HTTP endpoint returns 500") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(500).withBody("Internal Server Error")), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"xyz-456"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry stays PENDING (retriable failure, retries remaining)") { - counts[OutboxStatus.PENDING] shouldBe 1L - } - then("no DELIVERED entries") { - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - - `when`("the HTTP endpoint returns 400") { - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withStatus(400).withBody("Bad Request")), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"err-789"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry is immediately FAILED (permanent failure)") { - counts[OutboxStatus.FAILED] shouldBe 1L - } - then("no PENDING or DELIVERED entries") { - counts[OutboxStatus.PENDING] shouldBe 0L - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - - `when`("the endpoint is unreachable") { - // No stub — WireMock rejects with connection refused on unmapped path - wiremock.stubFor( - post(urlEqualTo("/api/notify")) - .willReturn(aResponse().withFault(com.github.tomakehurst.wiremock.http.Fault.CONNECTION_RESET_BY_PEER)), - ) - - transaction { - publisher.publish( - OutboxMessage("order.created", """{"orderId":"net-000"}"""), - httpDeliveryInfo { - serviceName = "notification-service" - endpointPath = "/api/notify" - }, - ) - } - - transaction { processor.processNext() } - - val counts = transaction { store.countByStatuses() } - - then("entry stays PENDING (retriable network failure)") { - counts[OutboxStatus.PENDING] shouldBe 1L - } - } - } - }) From 8ef5f1b81b4738ff17fd2840b0db53feef930969 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 14:26:56 +0100 Subject: [PATCH 13/15] Refactor PostgresOutboxStore.claimPending to use Exposed DSL forUpdate(SKIP_LOCKED) Replace raw SQL string + manual ResultSet mapping with type-safe Exposed DSL query and ForUpdateOption.PostgreSQL.SKIP_LOCKED. Eliminates mapFromResultSet() in favor of ResultRow.toOutboxEntry(). MySQL keeps raw SQL because InnoDB requires FORCE INDEX hint for correct SKIP LOCKED behavior, which Exposed DSL cannot express. --- .../okapi/postgres/PostgresOutboxStore.kt | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index a049081..dc8b49e 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -1,21 +1,21 @@ package com.softwaremill.okapi.postgres import com.softwaremill.okapi.core.OutboxEntry -import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import org.jetbrains.exposed.v1.core.IntegerColumnType +import org.jetbrains.exposed.v1.core.ResultRow +import org.jetbrains.exposed.v1.core.SortOrder import org.jetbrains.exposed.v1.core.alias import org.jetbrains.exposed.v1.core.count import org.jetbrains.exposed.v1.core.inList import org.jetbrains.exposed.v1.core.min +import org.jetbrains.exposed.v1.core.vendors.ForUpdateOption import org.jetbrains.exposed.v1.jdbc.select import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.upsert -import java.sql.ResultSet import java.time.Clock import java.time.Instant -import java.util.UUID /** PostgreSQL [OutboxStore] implementation using Exposed. */ class PostgresOutboxStore( @@ -39,17 +39,13 @@ class PostgresOutboxStore( } override fun claimPending(limit: Int): List { - val nativeQuery = - "SELECT * FROM ${OutboxTable.tableName}" + - " WHERE ${OutboxTable.status.name} = '${OutboxStatus.PENDING}'" + - " ORDER BY ${OutboxTable.createdAt.name} ASC" + - " LIMIT $limit FOR UPDATE SKIP LOCKED" - - return TransactionManager.current().exec(nativeQuery) { rs -> - generateSequence { - if (rs.next()) mapFromResultSet(rs) else null - }.toList() - } ?: emptyList() + return OutboxTable + .select(OutboxTable.columns) + .where { OutboxTable.status eq OutboxStatus.PENDING.name } + .orderBy(OutboxTable.createdAt to SortOrder.ASC) + .limit(limit) + .forUpdate(ForUpdateOption.PostgreSQL.ForUpdate(mode = ForUpdateOption.PostgreSQL.MODE.SKIP_LOCKED)) + .map { it.toOutboxEntry() } } override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) @@ -100,17 +96,17 @@ class PostgresOutboxStore( return OutboxStatus.entries.associateWith { status -> counts[status] ?: 0L } } - private fun mapFromResultSet(rs: ResultSet): OutboxEntry = OutboxEntry( - outboxId = OutboxId(UUID.fromString(rs.getString("id"))), - messageType = rs.getString("message_type"), - payload = rs.getString("payload"), - deliveryType = rs.getString("delivery_type"), - status = OutboxStatus.from(rs.getString("status")), - createdAt = rs.getTimestamp("created_at").toInstant(), - updatedAt = rs.getTimestamp("updated_at").toInstant(), - retries = rs.getInt("retries"), - lastAttempt = rs.getTimestamp("last_attempt")?.toInstant(), - lastError = rs.getString("last_error"), - deliveryMetadata = rs.getString("delivery_metadata"), + private fun ResultRow.toOutboxEntry(): OutboxEntry = OutboxEntry( + outboxId = this[OutboxTable.id], + messageType = this[OutboxTable.messageType], + payload = this[OutboxTable.payload], + deliveryType = this[OutboxTable.deliveryType], + status = OutboxStatus.from(this[OutboxTable.status]), + createdAt = this[OutboxTable.createdAt], + updatedAt = this[OutboxTable.updatedAt], + retries = this[OutboxTable.retries], + lastAttempt = this[OutboxTable.lastAttempt], + lastError = this[OutboxTable.lastError], + deliveryMetadata = this[OutboxTable.deliveryMetadata], ) } From 21cffbdfa951cdba91adbfdb93bdb796fc3647a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 14:57:32 +0100 Subject: [PATCH 14/15] Fix compilation after rebase onto feat102 - Add missing `eq` import in PostgresOutboxStore (lost during merge) - Update removeDeliveredBefore call to match new signature (time, limit) --- .../softwaremill/okapi/test/store/OutboxStoreContractTests.kt | 2 +- .../com/softwaremill/okapi/postgres/PostgresOutboxStore.kt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt index def3963..58a0260 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt @@ -180,7 +180,7 @@ fun FunSpec.outboxStoreContractTests( } // Remove delivered before Jan 5 — should delete old (lastAttempt=Jan 2) but keep recent (lastAttempt=Jan 11) - transaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z")) } + transaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z"), limit = 100) } val counts = transaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index dc8b49e..68fa2a6 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -8,6 +8,7 @@ import org.jetbrains.exposed.v1.core.ResultRow import org.jetbrains.exposed.v1.core.SortOrder import org.jetbrains.exposed.v1.core.alias import org.jetbrains.exposed.v1.core.count +import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList import org.jetbrains.exposed.v1.core.min import org.jetbrains.exposed.v1.core.vendors.ForUpdateOption From cc008eac62eeb61a980c4480bb49e68401ac0dce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 31 Mar 2026 10:47:28 +0200 Subject: [PATCH 15/15] Fix Liquibase changeset IDs to match migration file numbers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changeset IDs were not updated when migration files were renumbered during rebase (003→004 for Postgres, 002→003 for MySQL). --- .../com/softwaremill/okapi/db/mysql/003__add_claim_index.sql | 2 +- .../com/softwaremill/okapi/db/004__add_claim_index.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql index 53e6856..70ca899 100644 --- a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_claim_index.sql @@ -1,4 +1,4 @@ --liquibase formatted sql ---changeset outbox:002 +--changeset outbox:003 CREATE INDEX idx_outbox_status_created_at ON outbox (status, created_at); diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql index 9925c9a..2e4bafc 100644 --- a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_claim_index.sql @@ -1,4 +1,4 @@ --liquibase formatted sql ---changeset outbox:003 +--changeset outbox:004 CREATE INDEX IF NOT EXISTS idx_outbox_status_created_at ON outbox (status, created_at);