Skip to content

Commit

Permalink
Retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
patxibocos committed Jan 16, 2023
1 parent 1657f86 commit 5d6cc01
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ import io.ktor.client.plugins.auth.Auth
import io.ktor.client.plugins.auth.providers.BearerTokens
import io.ktor.client.plugins.auth.providers.bearer
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.client.request.forms.submitForm
import io.ktor.client.request.request
import io.ktor.client.request.url
import io.ktor.client.statement.HttpResponse
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.Parameters
import io.ktor.http.isSuccess
import io.ktor.serialization.kotlinx.json.json
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down Expand Up @@ -121,3 +128,22 @@ internal fun oneDriveHttpClient(clientId: String, clientSecret: String, refreshT
"refresh_token",
refreshToken,
)

suspend fun HttpClient.requestWithRetry(
urlString: String,
method: HttpMethod,
dontRetryFor: List<HttpStatusCode> = emptyList(),
maxRetries: Int = 3,
block: HttpRequestBuilder.() -> Unit = {},
): HttpResponse {
val response = request(HttpRequestBuilder().apply { url(urlString); block(); this.method = method })
val shouldRetry = !response.status.isSuccess() && !dontRetryFor.contains(response.status)
if (shouldRetry) {
if (maxRetries > 0) {
return requestWithRetry(urlString, method, dontRetryFor, maxRetries - 1, block)
} else {
throw Exception("Request failed (status ${response.status}): ${response.body<String>()}")
}
}
return response
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.patxibocos.googlephotosexporter

import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.get
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.isSuccess
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -66,7 +66,7 @@ class GooglePhotosRepository(
else -> IllegalStateException("Unknown type, is this MediaItem a video or a photo?")
}
val fullSizeUrl = "${mediaItem.baseUrl}=$suffix"
val response = httpClient.get(fullSizeUrl)
val response = httpClient.requestWithRetry(fullSizeUrl, HttpMethod.Get)
if (response.status == HttpStatusCode.Forbidden) {
throw GooglePhotosItemForbidden
}
Expand All @@ -90,7 +90,10 @@ class GooglePhotosRepository(
private suspend fun fetchItems(
nextPageToken: String,
): ListMediaItemsResponse {
val response = httpClient.get("$BASE_PATH/v1/mediaItems?pageSize=100&pageToken=$nextPageToken")
val response = httpClient.requestWithRetry(
"$BASE_PATH/v1/mediaItems?pageSize=100&pageToken=$nextPageToken",
HttpMethod.Get,
)
return response.body()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package io.github.patxibocos.googlephotosexporter.exporters

import io.github.patxibocos.googlephotosexporter.requestWithRetry
import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.forms.MultiPartFormDataContent
import io.ktor.client.request.forms.formData
import io.ktor.client.request.get
import io.ktor.client.request.header
import io.ktor.client.request.post
import io.ktor.client.request.put
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.http.ContentType
import io.ktor.http.Headers
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.contentType
import io.ktor.http.isSuccess
Expand All @@ -22,6 +21,7 @@ import mu.KotlinLogging
import org.slf4j.Logger
import java.security.MessageDigest
import java.util.*
import kotlin.collections.set

internal class BoxExporter(
private val httpClient: HttpClient,
Expand All @@ -38,10 +38,7 @@ internal class BoxExporter(
val folder = getFolderForPath("$prefixPath/$filePath", false) ?: return null
val fileName = filePath.split("/").last()
val file = folder.itemCollection.entries.find { it.name == fileName && it.type == "file" } ?: return null
val response = httpClient.get("$foldersPath/files/${file.id}/content")
if (!response.status.isSuccess()) {
throw Exception("Failed to get file $filePath: ${response.body<String>()}")
}
val response = httpClient.requestWithRetry("$foldersPath/files/${file.id}/content", HttpMethod.Get)
return response.body()
}

Expand Down Expand Up @@ -90,7 +87,7 @@ internal class BoxExporter(

private suspend fun getFolderForPath(filePath: String, createIfNotExists: Boolean): Folder? {
suspend fun createFolder(parentFolder: Folder, name: String): Folder {
return httpClient.post("$foldersPath/folders") {
return httpClient.requestWithRetry("$foldersPath/folders", HttpMethod.Post) {
contentType(ContentType.Application.Json)
setBody("""{"name":"$name","parent":{"id":"${parentFolder.id}"}}""")
}.body<Folder>().also {
Expand All @@ -100,7 +97,7 @@ internal class BoxExporter(
}

suspend fun getFolder(id: String): Folder {
return foldersCache[id] ?: httpClient.get("$foldersPath/folders/$id") {
return foldersCache[id] ?: httpClient.requestWithRetry("$foldersPath/folders/$id", HttpMethod.Get) {
contentType(ContentType.Application.Json)
}.body<Folder>().also {
foldersCache[id] = it
Expand All @@ -122,7 +119,7 @@ internal class BoxExporter(

override suspend fun upload(data: ByteArray, name: String, filePath: String, overrideContent: Boolean) {
suspend fun uploadFile(path: String, folderId: String, fileName: String): HttpResponse {
return httpClient.post(path) {
return httpClient.requestWithRetry(path, HttpMethod.Post, dontRetryFor = listOf(HttpStatusCode.Conflict)) {
setBody(
MultiPartFormDataContent(
formData {
Expand All @@ -146,10 +143,7 @@ internal class BoxExporter(
if (response.status == HttpStatusCode.Conflict && response.body<UploadResponse>().code == "item_name_in_use") {
if (overrideContent) {
val fileId = response.body<UploadResponse>().contextInfo?.conflicts?.id
val uploadVersionResponse = uploadFile("$filesPath/files/$fileId/content", folder.id, fileName)
if (!uploadVersionResponse.status.isSuccess()) {
throw Exception("Box upload (version) failed: ${response.body<String>()}")
}
uploadFile("$filesPath/files/$fileId/content", folder.id, fileName)
} else {
logger.warn("File $filePath already exists")
}
Expand All @@ -162,7 +156,11 @@ internal class BoxExporter(
// Create upload session -> POST /files/upload_sessions
// If conflict && overrideContent -> POST /files/:id/upload_sessions
// Upload each of the parts
var uploadSessionResponse = httpClient.post("$filesPath/files/upload_sessions") {
var uploadSessionResponse = httpClient.requestWithRetry(
"$filesPath/files/upload_sessions",
HttpMethod.Post,
dontRetryFor = listOf(HttpStatusCode.Conflict),
) {
contentType(ContentType.Application.Json)
setBody("""{"file_name":"$fileName","folder_id":"${folder.id}","file_size":${data.size}}""")
}
Expand All @@ -173,14 +171,11 @@ internal class BoxExporter(
}
val fileId = uploadSessionResponse.body<UploadResponse>().contextInfo?.conflicts?.id
uploadSessionResponse =
httpClient.post("$filesPath/files/$fileId/upload_sessions") {
httpClient.requestWithRetry("$filesPath/files/$fileId/upload_sessions", HttpMethod.Post) {
contentType(ContentType.Application.Json)
setBody("""{"file_size":${data.size}}""")
}
}
if (!uploadSessionResponse.status.isSuccess()) {
throw Exception("Box upload session failed: ${uploadSessionResponse.body<String>()}")
}
val body = uploadSessionResponse.body<UploadSessionResponse>()
data.inputStream().use { inputStream ->
val chunks = inputStream.buffered().iterator().asSequence().chunked(body.partSize)
Expand All @@ -189,7 +184,7 @@ internal class BoxExporter(
val chunk = bytes.toByteArray()
val sha = Base64.getEncoder().encode(MessageDigest.getInstance("SHA-1").digest(chunk))
.toString(Charsets.UTF_8)
val response = httpClient.put(body.sessionEndpoints.uploadPart) {
val response = httpClient.requestWithRetry(body.sessionEndpoints.uploadPart, HttpMethod.Put) {
contentType(ContentType.Application.OctetStream)
setBody(chunk)
header("Digest", "sha=$sha")
Expand All @@ -203,7 +198,7 @@ internal class BoxExporter(
.toString(Charsets.UTF_8)
val requestBody =
"""{"parts":[${partResponses.joinToString(",") { """{"part_id":"${it.part.partId}","offset":${it.part.offset},"size":${it.part.size},"sha1":"${it.part.sha1}"}""" }}]}"""
httpClient.post(body.sessionEndpoints.commit) {
httpClient.requestWithRetry(body.sessionEndpoints.commit, HttpMethod.Post) {
header("Digest", "sha=$sha")
contentType(ContentType.Application.Json)
setBody(requestBody)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.github.patxibocos.googlephotosexporter.exporters

import io.github.patxibocos.googlephotosexporter.requestWithRetry
import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.get
import io.ktor.client.request.header
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.http.ContentType
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.contentType
import io.ktor.http.isSuccess
Expand Down Expand Up @@ -39,13 +39,14 @@ internal class DropboxExporter(
private data class UploadSessionResponse(@SerialName("session_id") val sessionId: String)

override suspend fun get(filePath: String): ByteArray? {
val response = httpClient.get("$basePath/files/download") {
val response = httpClient.requestWithRetry(
"$basePath/files/download",
HttpMethod.Get,
dontRetryFor = listOf(HttpStatusCode.Conflict),
) {
contentType(ContentType.Application.OctetStream)
header("Dropbox-API-Arg", """{"path": "/$prefixPath/$filePath"}""")
}
if (response.status.isSuccess()) {
return response.body()
}
if (response.status == HttpStatusCode.Conflict && response.body<DownloadResponse>().error.path.tag == "not_found") {
return null
}
Expand All @@ -55,12 +56,10 @@ internal class DropboxExporter(
override suspend fun upload(data: ByteArray, name: String, filePath: String, overrideContent: Boolean) {
// If data is larger than 150 MB, an upload session must be created
val response = if (data.size > maxUploadSize) {
val uploadSessionResponse = httpClient.post("$basePath/files/upload_session/start") {
contentType(ContentType.Application.OctetStream)
}
if (!uploadSessionResponse.status.isSuccess()) {
throw Exception("Dropbox upload session start failed: ${uploadSessionResponse.body<String>()}")
}
val uploadSessionResponse =
httpClient.requestWithRetry("$basePath/files/upload_session/start", HttpMethod.Post) {
contentType(ContentType.Application.OctetStream)
}
val sessionId = uploadSessionResponse.body<UploadSessionResponse>().sessionId
// Split by chunks of 148 MB
// Each call must be multiple of 4194304 bytes (except for last)
Expand All @@ -69,20 +68,17 @@ internal class DropboxExporter(
val chunks = inputStream.buffered().iterator().asSequence().chunked(chunkSize)
val finalOffset = chunks.fold(0) { offset, bytes ->
val chunk = bytes.toByteArray()
val appendResponse = httpClient.post("$basePath/files/upload_session/append_v2") {
httpClient.requestWithRetry("$basePath/files/upload_session/append_v2", HttpMethod.Post) {
contentType(ContentType.Application.OctetStream)
header(
"Dropbox-API-Arg",
"""{"close":false,"cursor":{"offset":$offset,"session_id":"$sessionId"}}""",
)
setBody(chunk)
}
if (!appendResponse.status.isSuccess()) {
throw Exception("Dropbox upload session append failed: ${appendResponse.body<String>()}")
}
offset + chunk.size
}
httpClient.post("$basePath/files/upload_session/finish") {
httpClient.requestWithRetry("$basePath/files/upload_session/finish", HttpMethod.Post) {
contentType(ContentType.Application.OctetStream)
header(
"Dropbox-API-Arg",
Expand All @@ -91,7 +87,7 @@ internal class DropboxExporter(
}
}
} else {
httpClient.post("$basePath/files/upload") {
httpClient.requestWithRetry("$basePath/files/upload", HttpMethod.Post) {
contentType(ContentType.Application.OctetStream)
header("Dropbox-API-Arg", """{"path":"/$prefixPath/$filePath","strict_conflict":${!overrideContent}}""")
setBody(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.github.patxibocos.googlephotosexporter.exporters

import io.github.patxibocos.googlephotosexporter.requestWithRetry
import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.get
import io.ktor.client.request.put
import io.ktor.client.request.setBody
import io.ktor.http.ContentType
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.contentType
import io.ktor.http.isSuccess
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import mu.KotlinLogging
Expand All @@ -32,13 +31,17 @@ internal class GitHubExporter(
private val basePath = "https://api.github.com/repos/$repoOwner/$repoName/contents/$prefixPath"

override suspend fun get(filePath: String): ByteArray? {
val response = httpClient.get("$basePath/$filePath") {
val response = httpClient.requestWithRetry(
"$basePath/$filePath",
HttpMethod.Get,
dontRetryFor = listOf(HttpStatusCode.NotFound),
) {
contentType(ContentType.Application.Json)
}
if (!response.status.isSuccess()) {
if (response.status == HttpStatusCode.NotFound) {
return null
}
val fileResponse = httpClient.get(response.body<ResponseBody>().downloadUrl) {
val fileResponse = httpClient.requestWithRetry(response.body<ResponseBody>().downloadUrl, HttpMethod.Get) {
contentType(ContentType.Application.Json)
}
return fileResponse.body()
Expand All @@ -52,26 +55,32 @@ internal class GitHubExporter(
) {
val commitMessage = "Upload $name"
val sha: String? = if (overrideContent) {
val response = httpClient.get("$basePath/$filePath") {
val response = httpClient.requestWithRetry(
"$basePath/$filePath",
HttpMethod.Get,
dontRetryFor = listOf(HttpStatusCode.NotFound),
) {
contentType(ContentType.Application.Json)
}
if (response.status.isSuccess()) {
response.body<ResponseBody>().sha
} else {
if (response.status == HttpStatusCode.Companion.NotFound) {
null
} else {
response.body<ResponseBody>().sha
}
} else {
null
}
val base64 = String(Base64.getEncoder().encode(data))
val response = httpClient.put("$basePath/${filePath.replace(" ", "%20")}") {
val response = httpClient.requestWithRetry(
"$basePath/${filePath.replace(" ", "%20")}",
HttpMethod.Put,
dontRetryFor = listOf(HttpStatusCode.UnprocessableEntity),
) {
contentType(ContentType.Application.Json)
setBody(RequestBody(commitMessage, base64, sha))
}
if (response.status == HttpStatusCode.UnprocessableEntity) {
logger.warn("File $filePath already exists")
} else if (!response.status.isSuccess()) {
throw Exception("GitHub upload failed: ${response.body<String>()}")
}
}
}
Loading

0 comments on commit 5d6cc01

Please sign in to comment.