Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import kotlin.time.TimeSource.Monotonic.markNow
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
import nmcp.transport.Success
import nmcp.transport.executeWithRetries
import nmcp.transport.nmcpClient
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.MultipartBody
Expand Down Expand Up @@ -60,20 +62,17 @@ internal fun nmcpPublishWithPublisherApi(
val url = baseUrl + "api/v1/publisher/upload?publishingType=$publishingType"

logger.lifecycle("Uploading deployment to '$url'")
val deploymentId = Request.Builder()
val request = Request.Builder()
.post(body)
.addHeader("Authorization", "Bearer $token")
.url(url)
.build()
.let {
nmcpClient.newCall(it).execute()
}.use {
if (!it.isSuccessful) {
error("Cannot deploy to maven central (status='${it.code}'): ${it.body.string()}")
}
val result = executeWithRetries(logger, nmcpClient, request)

it.body.string()
}
if (result !is Success) {
error("Cannot upload deployment to maven central: ($result)}")
}
val deploymentId = result.body.use { it.readUtf8() }

logger.lifecycle("Nmcp: deployment bundle '$deploymentId' uploaded.")

Expand Down Expand Up @@ -118,6 +117,7 @@ private fun waitFor(
}

val status = verifyStatus(
logger = logger,
deploymentId = deploymentId,
baseUrl = baseUrl,
token = token,
Expand All @@ -137,9 +137,6 @@ private fun waitFor(

private sealed interface Status

// A deployment has successfully been uploaded to Maven Central
private data object UNKNOWN_QUERY_LATER : Status

// A deployment is uploaded and waiting for processing by the validation service
private data object PENDING : Status

Expand All @@ -159,47 +156,42 @@ private data object PUBLISHED : Status
private class FAILED(val error: String) : Status

private fun verifyStatus(
logger: GLogger,
deploymentId: String,
baseUrl: String,
token: String,
): Status {
Request.Builder()
val request = Request.Builder()
.post(ByteString.EMPTY.toRequestBody())
.addHeader("Authorization", "Bearer $token")
.url(baseUrl + "api/v1/publisher/status?id=$deploymentId")
.build()
.let {
try {
nmcpClient.newCall(it).execute()
} catch (_: SocketTimeoutException) {
return UNKNOWN_QUERY_LATER
}
}.use {
if (!it.isSuccessful) {
error("Cannot verify deployment $deploymentId status (HTTP status='${it.code}'): ${it.body.string()}")
}
val result = executeWithRetries(logger, nmcpClient, request)
if (result !is Success) {
error("Cannot verify deployment $deploymentId status ($result)")
}

val str = it.body.string()
val element = Json.parseToJsonElement(str)
check(element is JsonObject) {
"Nmcp: unexpected status response for deployment $deploymentId: $str"
}
val str = result.body.use { it.readUtf8() }
val element = Json.parseToJsonElement(str)
check(element is JsonObject) {
"Nmcp: unexpected status response for deployment $deploymentId: $str"
}

val state = element["deploymentState"]
check(state is JsonPrimitive && state.isString) {
"Nmcp: unexpected deploymentState for deployment $deploymentId: $state"
}
val state = element["deploymentState"]
check(state is JsonPrimitive && state.isString) {
"Nmcp: unexpected deploymentState for deployment $deploymentId: $state"
}

return when (state.content) {
"PENDING" -> PENDING
"VALIDATING" -> VALIDATING
"VALIDATED" -> VALIDATED
"PUBLISHING" -> PUBLISHING
"PUBLISHED" -> PUBLISHED
"FAILED" -> {
FAILED(element["errors"].toString())
}
else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state")
}
return when (state.content) {
"PENDING" -> PENDING
"VALIDATING" -> VALIDATING
"VALIDATED" -> VALIDATED
"PUBLISHING" -> PUBLISHING
"PUBLISHED" -> PUBLISHED
"FAILED" -> {
FAILED(element["errors"].toString())
}
else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state")
}

}
147 changes: 111 additions & 36 deletions nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package nmcp.transport

import gratatouille.tasks.GLogger
import java.io.File
import kotlin.math.pow
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.MediaType
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okio.BufferedSink
import okio.BufferedSource
import okio.IOException
import okio.buffer
import okio.sink
import okio.source
Expand Down Expand Up @@ -88,25 +91,21 @@ internal class HttpTransport(

logger.info("Nmcp: get '$url'")

val response = Request.Builder()
val request = Request.Builder()
.get()
.url(url)
.maybeAddAuthorization(getAuthorization)
.build()
.let {
client.newCall(it).execute()
}

if (response.code == 404) {
response.close()
val result = executeWithRetries(logger, client, request)
if (result is HttpError && result.code == 404) {
return null
}
if (!response.isSuccessful) {
response.close()
error("Nmcp: cannot GET '$url' (statusCode=${response.code}):\n${response.body.string()}")
if (result !is Success) {
error("Nmcp: cannot GET '$url' (${result})")
}

return response.body.source()
return result.body
}

override fun put(path: String, body: Content) {
Expand All @@ -116,41 +115,117 @@ internal class HttpTransport(

logger.info("Nmcp: put '$url'")

Request.Builder()
val request = Request.Builder()
.put(body.toRequestBody())
.url(url)
.maybeAddAuthorization(putAuthorization)
.build()
.let {
client.newCall(it).execute()
}.use { response ->
check(response.isSuccessful) {
buildString {
appendLine("Nmcp: cannot PUT '$url' (statusCode=${response.code}).")
appendLine("Response body: '${response.body.string()}'")
when (response.code) {
400 -> {
appendLine("Things to double check:")
appendLine("Your artifacts have proper extensions (.jar, .pom, ...).")
appendLine("If publishing a XML file, the XML version is 1.0.")
appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.")
}
401 -> {
appendLine("Check your credentials")
appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.")
}
403 -> {
appendLine("Check that you are publishing to the correct groupId.")
}
429 -> {
appendLine("Too many requests, try again later")
}
}

val result = executeWithRetries(logger, client, request)
if (result is Success) {
result.body.close()
return
}

val error = buildString {
appendLine("Nmcp: cannot PUT '$url'")
appendLine("$result")
if (result is HttpError) {
when (result.code) {
400 -> {
appendLine("Things to double check:")
appendLine("Your artifacts have proper extensions (.jar, .pom, ...).")
appendLine("If publishing a XML file, the XML version is 1.0.")
appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.")
}
401 -> {
appendLine("Check your credentials")
appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.")
}
403 -> {
appendLine("Check that you are publishing to the correct groupId.")
}
429 -> {
appendLine("Too many requests, try again later")
}
}
}
}
error(error)
}
}

/**
* In some cases, 401 is actually retryable.
* This is the case for:
* - PUT on htps://central.sonatype.com/repository/maven-snapshots/
* - verification of a deployment
*
* This is quite unexpected, and we code defensively here to be robust to those cases.
* We also retry other errors.
*
* Example of transient 401:
* ```
* Execution failed for task ':nmcpPublishAggregationToCentralPortal'.
* > A failure occurred while executing nmcp.internal.task.NmcpPublishWithPublisherApiWorkAction
* > Cannot verify deployment fbed2636-e25d-4538-be7d-7693d475595d status (HTTP status='401'): {"error":{"message":"Invalid token"}}
* ```
*
* TODO:
* - rework this to not block the thread.
* - move the logic to some upper, sonatype-only layer
* - fine tune the retry logic. Do we want to retry everything like we do here? Or are some HTTP errors actually
* not retryable?
*
* @return the result. If the result is a success, the caller MUST close its body.
*/
internal fun executeWithRetries(logger: GLogger, client: OkHttpClient, request: Request): Result {
var attempt = 0
val attemptCount = 3
while(true) {
val result = executeInternal(client, request)
if (result is Success) {
return result
}
if (result is HttpError && result.code == 404) {
// 404 is not retryable
return result
}
if (attempt == attemptCount - 1) {
return result
}

logger.lifecycle("Nmcp: put '${request.url}' failed (${result}), retrying... (attempt ${attempt + 1}/${attemptCount})")
Thread.sleep(2.0.pow(attempt.toDouble()).toLong() * 1_000)
attempt++
}
}

internal fun executeInternal(client: OkHttpClient, request: Request): Result {
return try {
val response = client.newCall(request).execute()
if (response.isSuccessful) {
return Success(response.body.source())
}

HttpError(response.code, response.body.string())
} catch (e: IOException) {
NetworkError(e)
}
}

internal sealed interface Result
internal class NetworkError(val exception: IOException) : Result {
override fun toString(): String {
return "NetworkError: ${exception.message}"
}
}
internal class HttpError(val code: Int, val body: String): Result {
override fun toString(): String {
return "HTTP error $code: '$body'"
}
}
internal class Success(val body: BufferedSource) : Result

fun Content.toRequestBody(): RequestBody {
return object : RequestBody() {
Expand Down