Skip to content

Commit

Permalink
Merge pull request #114 from jillesvangurp/make-retry-delay-configurable
Browse files Browse the repository at this point in the history
make retry delay configurable
  • Loading branch information
jillesvangurp authored Feb 9, 2024
2 parents 5e0f990 + bd64497 commit d42909a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import com.jillesvangurp.ktsearch.repository.repository
import com.jillesvangurp.searchdsls.querydsl.match
import com.jillesvangurp.searchdsls.querydsl.matchAll
import documentation.manual.ManualPages
import documentation.manual.bulk.bulkMd
import documentation.mdLink
import documentation.sourceGitRepository
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.Serializable
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -170,9 +168,9 @@ val indexRepoMd = sourceGitRepository.md {
""".trimIndent()
suspendingExample(false) {
val id = repo.index(TestDoc("A document")).id
repo.update(id, maxRetries = 2) { oldVersion ->
repo.update(id, maxRetries = 2, block = { oldVersion ->
oldVersion.copy(message = "An updated document")
}
}, retryDelay = 2.seconds)
}
+"""
This fetches the document and its `primary_term` and `seq_no` values, applies your update function,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class RetryingBulkHandler<T : Any>(
private val indexRepository: IndexRepository<T>,
private val parentBulkItemCallBack: BulkItemCallBack? = null,
private val maxRetries: Int = 2,
private val retryDelay: Duration = 2.seconds,
private val updateScope: CoroutineScope = CoroutineScope(CoroutineName("bulk-update"))
) : BulkItemCallBack {
private var count = 0
Expand All @@ -31,7 +32,7 @@ internal class RetryingBulkHandler<T : Any>(
val job = updateScope.launch {
try {
val (_, result) = indexRepository.update(
item.id, maxRetries = maxRetries, block = updateFunction
item.id, maxRetries = maxRetries, block = updateFunction, retryDelay = retryDelay
)
itemOk(
operationType, BulkResponse.ItemDetails(
Expand Down Expand Up @@ -300,12 +301,13 @@ class IndexRepository<T : Any>(

suspend fun update(
id: String,
maxRetries: Int = 3,
maxRetries: Int = 5,
pipeline: String? = null,
refresh: Refresh? = null,
routing: String? = null,
timeout: Duration? = null,
block: (T) -> T
retryDelay: Duration = 2.seconds,
block: (T) -> T,
): Pair<T, DocumentIndexResponse> =
update(
id = id,
Expand All @@ -315,17 +317,19 @@ class IndexRepository<T : Any>(
refresh = refresh ?: defaultRefresh,
routing = routing,
timeout = timeout ?: defaultTimeout,
retryDelay = retryDelay,
block = block
)

private suspend fun update(
id: String,
attempt: Int = 0,
maxRetries: Int = 3,
pipeline: String? = null,
refresh: Refresh? = null,
routing: String? = null,
timeout: Duration? = null,
pipeline: String?,
refresh: Refresh?,
routing: String?,
timeout: Duration?,
retryDelay: Duration,
block: (T) -> T
): Pair<T, DocumentIndexResponse> {
val (original, resp) = get(id, extraParameters = defaultParameters)
Expand All @@ -352,9 +356,20 @@ class IndexRepository<T : Any>(
}
// 429 means we're triggering a circuit breaker, so back off before retrying
// we've seen this kind of failure a few times.
delay(1.seconds)
delay(retryDelay)
}
return update(id = id, attempt = attempt + 1, maxRetries = maxRetries, block = block).also {
return update(
id = id,
attempt = attempt+1,
maxRetries = maxRetries,
pipeline = pipeline,
refresh = refresh ?: defaultRefresh,
routing = routing,
timeout = timeout ?: defaultTimeout,
retryDelay = 1.seconds,
block = block
)
.also {
if(attempt>0 && logging) {
logger.info { "update succeeded on attempt $attempt" }
}
Expand Down Expand Up @@ -406,12 +421,19 @@ class IndexRepository<T : Any>(
failOnFirstError: Boolean = false,
callBack: BulkItemCallBack? = null,
maxRetries: Int = 2,
retryDelay: Duration = 2.seconds,
retryTimeout: Duration = 1.minutes,
block: suspend TypedDocumentIBulkSession<T>.() -> Unit
) {

val updateFunctions = mutableMapOf<String, (T) -> T>()
val retryCallback = RetryingBulkHandler(updateFunctions, this, callBack, maxRetries)
val retryCallback = RetryingBulkHandler(
updateFunctions = updateFunctions,
indexRepository = this,
parentBulkItemCallBack = callBack,
maxRetries = maxRetries,
retryDelay = retryDelay,
)
val session = DefaultBulkSession(
searchClient = client,
failOnFirstError = failOnFirstError,
Expand Down Expand Up @@ -563,7 +585,9 @@ class IndexRepository<T : Any>(
typedKeys: Boolean? = null,
version: Boolean? = null,
extraParameters: Map<String, String>? = null,
): SearchResponse {
retries: Int = 3,
retryDelay: Duration = 2.seconds,
): SearchResponse {
return client.search(
target = indexReadAlias,
rawJson = rawJson,
Expand Down Expand Up @@ -609,7 +633,9 @@ class IndexRepository<T : Any>(
trackTotalHits = trackTotalHits,
typedKeys = typedKeys,
version = version,
extraParameters = extraParameters
extraParameters = extraParameters,
retries = retries,
retryDelay = retryDelay
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import mu.KotlinLogging
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
private val logger = KotlinLogging.logger { }

private val logger = KotlinLogging.logger { }

suspend fun SearchClient.search(
target: String?,
Expand Down Expand Up @@ -165,7 +166,10 @@ suspend fun SearchClient.search(
typedKeys: Boolean? = null,
version: Boolean? = null,
extraParameters: Map<String, String>? = null,
) =
retries: Int = 3,
retryDelay: Duration = 2.seconds,

) =
search(
target = target,
rawJson = dsl.json(),
Expand Down Expand Up @@ -211,7 +215,9 @@ suspend fun SearchClient.search(
trackTotalHits = trackTotalHits,
typedKeys = typedKeys,
version = version,
extraParameters = extraParameters
extraParameters = extraParameters,
retries = retries,
retryDelay = retryDelay,
)

enum class SearchOperator { AND, OR }
Expand Down Expand Up @@ -269,6 +275,7 @@ suspend fun SearchClient.search(
version: Boolean? = null,
extraParameters: Map<String, String>? = null,
retries: Int = 3,
retryDelay: Duration = 2.seconds,
): SearchResponse {
return try {
restClient.post {
Expand Down Expand Up @@ -323,11 +330,11 @@ suspend fun SearchClient.search(
}
}.parse(SearchResponse.serializer(), json)
} catch (e: RestException) {
if(e.status == 429 && retries >0) {
if (e.status == 429 && retries > 0) {
// we're tripping up a circuit breaker so sleep and retry
delay(1.seconds)
delay(retryDelay)
logger.warn { "Circuit breaker tripped (429), retrying, attempts remaining: $retries: ${e.message}" }
search(
search(
target = target,
rawJson = rawJson,
allowNoIndices = allowNoIndices,
Expand Down Expand Up @@ -373,7 +380,8 @@ suspend fun SearchClient.search(
typedKeys = typedKeys,
version = version,
extraParameters = extraParameters,
retries = retries-1
retries = retries - 1,
retryDelay = retryDelay
)
} else {
throw e
Expand Down Expand Up @@ -529,7 +537,10 @@ suspend fun SearchClient.searchAfter(
keepAlive: Duration,
query: SearchDSL,
optInToCustomSort: Boolean = false,
): Pair<SearchResponse, Flow<SearchResponse.Hit>> {
retries: Int = 3,
retryDelay: Duration = 2.seconds,

): Pair<SearchResponse, Flow<SearchResponse.Hit>> {
validateEngine(
"search_after and pit api work slightly different on Opensearch 2.x and not at all on OS1",
SearchEngineVariant.ES7,
Expand Down Expand Up @@ -558,7 +569,12 @@ suspend fun SearchClient.searchAfter(
}
}

val response = search(null, query.json())
val response = search(
null,
query.json(),
retries = retries,
retryDelay = retryDelay
)

val hitFlow = flow {
var resp: SearchResponse = response
Expand All @@ -574,7 +590,12 @@ suspend fun SearchClient.searchAfter(
this["keep_alive"] = "${keepAlive.inWholeSeconds}s"
}
}
resp = search(null, query.json())
resp = search(
null,
query.json(),
retries = retries,
retryDelay = retryDelay
)
emit(resp)
}
}.flatMapConcat { it.searchHits.asFlow() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.jillesvangurp.ktsearch.total
import io.kotest.matchers.ints.shouldBeGreaterThan
import io.kotest.matchers.shouldBe
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds

class IndexRepositoryTest : SearchTestBase() {

Expand All @@ -17,9 +18,9 @@ class IndexRepositoryTest : SearchTestBase() {
val (doc,_)= repo.get(d.id)
doc.name shouldBe "1"

val (doc2,_) =repo.update(d.id) {
val (doc2,_) =repo.update(d.id, block = {
it.copy(name="2")
}
}, retryDelay = 1.seconds)
doc2.name shouldBe "2"
val (doc3,_)= repo.get(d.id)
doc3 shouldBe doc2
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pluginManagement {

plugins {
id("de.fayard.refreshVersions") version "0.60.3"
//// # available:"0.60.4"
//// # available:"0.60.5"
}

refreshVersions {
Expand Down
21 changes: 13 additions & 8 deletions versions.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ plugin.org.jetbrains.dokka=1.9.10

version.ch.qos.logback..logback-classic=1.4.14

version.com.github.doyaaaaaken..kotlin-csv=1.9.2
version.com.github.doyaaaaaken..kotlin-csv=1.9.3

version.com.github.jillesvangurp..kotlin4example=1.1.1
version.com.github.jillesvangurp..kotlin4example=1.1.2

version.io.github.microutils..kotlin-logging=3.0.5
## # available=4.0.0-beta-1
## # available=4.0.0-beta-2

version.junit.jupiter=5.10.1
version.junit.jupiter=5.10.2

version.kotest=5.8.0

version.kotlin=1.9.22
## # available=2.0.0-Beta1
## # available=2.0.0-Beta2
## # available=2.0.0-Beta3

version.kotlinx.coroutines=1.7.3
## # available=1.8.0-RC
Expand All @@ -37,21 +38,25 @@ version.kotlinx.datetime=0.5.0

version.kotlinx.serialization=1.6.2

version.ktor=2.3.7
version.ktor=2.3.8
### available=3.0.0-beta-1

version.org.apache.logging.log4j..log4j-to-slf4j=2.22.1
## # available=3.0.0-alpha1
## # available=3.0.0-beta1

version.org.slf4j..jcl-over-slf4j=2.0.10
version.org.slf4j..jcl-over-slf4j=2.0.12
## # available=2.1.0-alpha0
## # available=2.1.0-alpha1

version.org.slf4j..jul-to-slf4j=2.0.10
version.org.slf4j..jul-to-slf4j=2.0.12
## # available=2.1.0-alpha0
## # available=2.1.0-alpha1

version.org.slf4j..log4j-over-slf4j=2.0.10
version.org.slf4j..log4j-over-slf4j=2.0.12
## # available=2.1.0-alpha0
## # available=2.1.0-alpha1

version.org.slf4j..slf4j-api=2.0.10
version.org.slf4j..slf4j-api=2.0.12
## # available=2.1.0-alpha0
## # available=2.1.0-alpha1

0 comments on commit d42909a

Please sign in to comment.