diff --git a/Cargo.lock b/Cargo.lock index 7a347d8..44335ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,9 +1045,9 @@ checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" [[package]] name = "gitlab-runner" -version = "0.0.5" +version = "0.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "017512a50302bb7d24db480af977734de04a781affceee9716c42b04b5501e85" +checksum = "bc9c03f6aa98ae8a13e2e860bdf4fc6b4a7cb09e97d2cc9370f689e6690077c0" dependencies = [ "async-trait", "bytes", @@ -1701,7 +1701,7 @@ dependencies = [ [[package]] name = "obs-gitlab-runner" -version = "0.1.3" +version = "0.1.6" dependencies = [ "async-trait", "backoff", diff --git a/Cargo.toml b/Cargo.toml index 34b489d..277cc67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ tracing-error = "0.2" tracing-subscriber = { version = "0.3", features = ["default", "json"] } url = "2.2" -gitlab-runner = "0.0.5" +gitlab-runner = "0.0.7" # gitlab-runner = { path = "../gitlab-runner-rs/gitlab-runner" } open-build-service-api = { git = "https://github.com/collabora/open-build-service-rs" } # open-build-service-api = { path = "../open-build-service-rs/open-build-service-api" } diff --git a/README.md b/README.md index 682431c..6ff07e5 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ generate-monitor RUNNER_TAG [--build-info BUILD_INFO_FILE=build-info.yml] [--pipeline-out PIPELINE_FILE=obs.yml] [--job-prefix MONITOR_JOB_PREFIX=obs] + [--job-timeout MONITOR_JOB_TIMEOUT] [--artifact-expiration ARTIFACT_EXPIRATION='3 days'] [--build-log-out BUILD_LOG_FILE=build.log] ``` @@ -214,6 +215,12 @@ Changes the filename of the child pipeline YAML. Changes the prefix that will be prepended to each generated job (`MONITOR_JOB_PREFIX-REPOSITORY-ARCH`). +##### `--job-timeout MONITOR_JOB_TIMEOUT` + +Changes the timeout for each generated job, using the [job `timeout` +setting](https://docs.gitlab.com/ee/ci/yaml/#timeout). If not passed, the +timeout will not be set. + ##### `--artifact-expiration ARTIFACT_EXPIRATION='3 days'` Changes the expiration of the build results & logs. diff --git a/src/binaries.rs b/src/binaries.rs index ba22e93..4d5ef55 100644 --- a/src/binaries.rs +++ b/src/binaries.rs @@ -7,7 +7,7 @@ use tokio::{fs::File as AsyncFile, io::AsyncSeekExt}; use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{info_span, instrument, Instrument}; -use crate::retry::{retry_large_request, retry_request}; +use crate::retry::retry_request; #[instrument(skip(client))] pub async fn download_binaries( @@ -28,7 +28,7 @@ pub async fn download_binaries( let mut binaries = HashMap::new(); for binary in binary_list.binaries { - let mut dest = retry_large_request(|| { + let mut dest = retry_request(|| { let binary = binary.clone(); let client = client.clone(); async move { diff --git a/src/handler.rs b/src/handler.rs index 1ba2126..346c7c8 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -84,6 +84,8 @@ struct GenerateMonitorAction { pipeline_out: String, #[clap(long, default_value_t = DEFAULT_PIPELINE_JOB_PREFIX.to_owned())] job_prefix: String, + #[clap(long)] + job_timeout: Option, #[clap(long, default_value_t = DEFAULT_ARTIFACT_EXPIRATION.to_owned())] artifact_expiration: String, #[clap(long, default_value_t = DEFAULT_BUILD_LOG.into())] @@ -410,6 +412,7 @@ impl ObsJobHandler { tags: vec![args.tag], artifact_expiration: args.artifact_expiration, prefix: args.job_prefix, + timeout: args.job_timeout, rules: args.rules, download_binaries: if let Some(build_results_dir) = args.build_results_dir { PipelineDownloadBinaries::OnSuccess { @@ -1266,6 +1269,7 @@ mod tests { download_binaries: bool, ) { const TEST_JOB_RUNNER_TAG: &str = "test-tag"; + const TEST_MONITOR_TIMEOUT: &str = "1 day"; const TEST_BUILD_RESULTS_DIR: &str = "results"; const TEST_BUILD_RESULT: &str = "test-build-result"; const TEST_BUILD_RESULT_CONTENTS: &[u8] = b"abcdef"; @@ -1307,8 +1311,8 @@ mod tests { ); let mut generate_command = format!( - "generate-monitor {} --rules '[{{a: 1}}, {{b: 2}}]'", - TEST_JOB_RUNNER_TAG + "generate-monitor {} --job-timeout '{}' --rules '[{{a: 1}}, {{b: 2}}]'", + TEST_JOB_RUNNER_TAG, TEST_MONITOR_TIMEOUT ); if download_binaries { generate_command += &format!(" --download-build-results-to {}", TEST_BUILD_RESULTS_DIR); @@ -1432,6 +1436,13 @@ mod tests { assert_eq!(tags.len(), 1); assert_eq!(tags[0].as_str().unwrap(), TEST_JOB_RUNNER_TAG); + let timeout = monitor_map + .get(&"timeout".into()) + .unwrap() + .as_str() + .unwrap(); + assert_eq!(timeout, TEST_MONITOR_TIMEOUT); + let rules: Vec<_> = monitor_map .get(&"rules".into()) .unwrap() diff --git a/src/monitor.rs b/src/monitor.rs index 2880504..9b2d861 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -11,7 +11,7 @@ use tokio::{ }; use tracing::{debug, instrument}; -use crate::retry::{retry_large_request, retry_request}; +use crate::retry::retry_request; #[derive(Debug)] pub enum PackageCompletion { @@ -245,7 +245,7 @@ impl ObsMonitor { pub async fn download_build_log(&self) -> Result { const LOG_LEN_TO_CHECK_FOR_MD5: u64 = 2500; - let mut file = retry_large_request(|| async { + let mut file = retry_request(|| async { let mut file = AsyncFile::from_std( tempfile::tempfile().wrap_err("Failed to create tempfile to build log")?, ); diff --git a/src/pipeline.rs b/src/pipeline.rs index 89aae04..247cede 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -17,6 +17,7 @@ pub struct GeneratePipelineOptions { pub tags: Vec, pub artifact_expiration: String, pub prefix: String, + pub timeout: Option, pub rules: Option, pub build_log_out: String, pub download_binaries: PipelineDownloadBinaries, @@ -35,6 +36,8 @@ struct JobSpec { before_script: Vec, script: Vec, after_script: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + timeout: Option, artifacts: ArtifactsSpec, #[serde(skip_serializing_if = "Option::is_none")] rules: Option, @@ -114,6 +117,7 @@ pub fn generate_monitor_pipeline( // ensure that they're set to be empty. before_script: vec![], after_script: vec![], + timeout: options.timeout.clone(), artifacts: ArtifactsSpec { paths: artifact_paths, when: "always".to_owned(), diff --git a/src/retry.rs b/src/retry.rs index 79cff39..8e47322 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -1,13 +1,14 @@ -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use backoff::ExponentialBackoff; use futures_util::Future; use color_eyre::{eyre::Result, Report}; use open_build_service_api as obs; -use tokio::sync::Mutex; use tracing::instrument; +const INITIAL_INTERVAL: Duration = Duration::from_millis(300); + fn is_client_error(err: &(dyn std::error::Error + 'static)) -> bool { err.downcast_ref::() .and_then(|e| e.status()) @@ -30,62 +31,40 @@ fn is_caused_by_client_error(report: &Report) -> bool { }) } -async fn retry_request_impl(backoff_limit: Duration, func: Func) -> Result +#[instrument(skip(func))] +pub async fn retry_request(func: Func) -> Result where Fut: Future>, - Func: FnMut() -> Fut, + Func: Fn() -> Fut, E: Into, { - let func = Arc::new(Mutex::new(func)); backoff::future::retry( ExponentialBackoff { - max_elapsed_time: Some(backoff_limit), + max_elapsed_time: None, + initial_interval: INITIAL_INTERVAL, ..Default::default() }, - move || { - let func = func.clone(); - async move { - let mut func = func.lock().await; - func().await.map_err(|err| { - let report = err.into(); - if is_caused_by_client_error(&report) { - backoff::Error::permanent(report) - } else { - backoff::Error::transient(report) - } - }) - } + || async { + func().await.map_err(|err| { + let report = err.into(); + if is_caused_by_client_error(&report) { + backoff::Error::permanent(report) + } else { + backoff::Error::transient(report) + } + }) }, ) .await } -#[instrument(skip(func))] -pub async fn retry_request(func: Func) -> Result -where - Fut: Future>, - Func: FnMut() -> Fut, - E: Into, -{ - const BACKOFF_LIMIT: Duration = Duration::from_secs(10 * 60); // 10 minutes - retry_request_impl(BACKOFF_LIMIT, func).await -} - -#[instrument(skip(func))] -pub async fn retry_large_request(func: Func) -> Result -where - Fut: Future>, - Func: FnMut() -> Fut, - E: Into, -{ - const BACKOFF_LIMIT: Duration = Duration::from_secs(60 * 60); // 1 hour - retry_request_impl(BACKOFF_LIMIT, func).await -} - #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicI32, Ordering}; + use claim::*; use open_build_service_api as obs; + use rstest::*; use wiremock::{ matchers::{method, path_regex}, Mock, MockServer, ResponseTemplate, @@ -95,10 +74,12 @@ mod tests { use super::*; - const LIMIT: Duration = Duration::from_secs(1); + fn wrap_in_io_error(err: obs::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, err) + } - #[tokio::test] - async fn test_retry_on_non_client_errors() { + #[fixture] + async fn server() -> MockServer { let server = MockServer::start().await; Mock::given(method("GET")) @@ -116,62 +97,108 @@ mod tests { .mount(&server) .await; + server + } + + #[rstest] + #[tokio::test] + async fn test_retry_on_non_client_errors(server: impl Future) { + let server = server.await; let client = obs::Client::new( server.uri().parse().unwrap(), TEST_USER.to_owned(), TEST_PASS.to_owned(), ); - let mut attempts = 0; + let attempts = AtomicI32::new(0); assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; - async { client.project("500".to_owned()).meta().await } - }) + tokio::time::timeout( + Duration::from_millis(2000), + retry_request(|| { + attempts.fetch_add(1, Ordering::SeqCst); + async { client.project("500".to_owned()).meta().await } + }) + ) .await ); - assert_gt!(attempts, 1); + assert_gt!(attempts.load(Ordering::SeqCst), 1); + } + + #[rstest] + #[tokio::test] + async fn test_retry_on_nested_non_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); - let mut attempts = 0; + let attempts = AtomicI32::new(0); assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; - async { - client - .project("500".to_owned()) - .meta() - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - } - }) + tokio::time::timeout( + Duration::from_millis(2000), + retry_request(|| { + attempts.fetch_add(1, Ordering::SeqCst); + async { + client + .project("500".to_owned()) + .meta() + .await + .map_err(wrap_in_io_error) + } + }) + ) .await ); - assert_gt!(attempts, 1); + assert_gt!(attempts.load(Ordering::SeqCst), 1); + } - attempts = 0; + #[rstest] + #[tokio::test] + async fn test_no_retry_on_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); + + let attempts = AtomicI32::new(0); assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; + retry_request(|| { + attempts.fetch_add(1, Ordering::SeqCst); async { client.project("403".to_owned()).meta().await } }) .await ); - assert_eq!(attempts, 1); + assert_eq!(attempts.load(Ordering::SeqCst), 1); + } + + #[rstest] + #[tokio::test] + async fn test_no_retry_on_nested_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); - attempts = 0; + let attempts = AtomicI32::new(0); assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; + retry_request(|| { + attempts.fetch_add(1, Ordering::SeqCst); async { client .project("403".to_owned()) .meta() .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map_err(wrap_in_io_error) } }) .await ); - assert_eq!(attempts, 1); + assert_eq!(attempts.load(Ordering::SeqCst), 1); } } diff --git a/src/upload.rs b/src/upload.rs index c8d85cf..79516c2 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -10,11 +10,7 @@ use md5::{Digest, Md5}; use open_build_service_api as obs; use tracing::{debug, info_span, instrument, trace, Instrument}; -use crate::{ - artifacts::ArtifactDirectory, - dsc::Dsc, - retry::{retry_large_request, retry_request}, -}; +use crate::{artifacts::ArtifactDirectory, dsc::Dsc, retry::retry_request}; type Md5String = String; @@ -234,7 +230,7 @@ impl ObsDscUploader { debug!("Uploading file"); let file = artifacts.get_file(root.join(filename).as_str()).await?; - retry_large_request(|| { + retry_request(|| { file.try_clone().then(|file| async { let file = file.wrap_err("Failed to clone file")?; self.client