Skip to content

Commit

Permalink
Handled know non-fatal errors better when submitting jobs. Fixes #23
Browse files Browse the repository at this point in the history
  • Loading branch information
gsleap committed Jul 16, 2024
1 parent c877c59 commit 774111f
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 55 deletions.
96 changes: 93 additions & 3 deletions src/asvo/asvo_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,27 @@ impl DummyJob {
}
}

/// When defining serde structs remember order matters!
/// Put the most specific matches first, then less
/// specific last!
#[derive(Deserialize, PartialEq, Debug)]
#[serde(untagged)]
pub(super) enum AsvoSubmitJobResponse {
JobID { job_id: AsvoJobID },
ErrorWithCode { error_code: u32, error: String },
GenericError { error: String },
JobIDWithError {
error: String,
error_code: u32,
job_id: AsvoJobID,
},
JobID {
job_id: AsvoJobID,
},
ErrorWithCode {
error_code: u32,
error: String,
},
GenericError {
error: String,
},
}

#[cfg(test)]
Expand Down Expand Up @@ -164,4 +179,79 @@ mod tests {
decoded.unwrap()
);
}

#[test]
fn test_json_job_submit_response_job_already_q_p_c_parse() {
let json = "{\"error\": \"Job already queued, processing or complete\", \"error_code\": 2, \"job_id\": 10001822}";
let decoded = serde_json::from_str::<AsvoSubmitJobResponse>(json);
assert!(decoded.is_ok());
assert_eq!(
AsvoSubmitJobResponse::JobIDWithError {
error_code: 2,
error: "Job already queued, processing or complete".to_string(),
job_id: 10001822
},
decoded.unwrap()
);
}

#[test]
fn test_json_job_submit_response_job_already_q_p_parse() {
let json = "{\"error\": \"Job already queued or processing.\", \"error_code\": 2, \"job_id\": 10001822}";
let decoded = serde_json::from_str::<AsvoSubmitJobResponse>(json);
assert!(decoded.is_ok());
assert_eq!(
AsvoSubmitJobResponse::JobIDWithError {
error_code: 2,
error: "Job already queued or processing.".to_string(),
job_id: 10001822
},
decoded.unwrap()
);
}

#[test]
fn test_json_job_submit_response_full_or_partial_outage1() {
let json = "{\"error\": \"Your job cannot be submitted as there is a full outage in progress.\", \"error_code\": 0}";
let decoded = serde_json::from_str::<AsvoSubmitJobResponse>(json);
assert!(decoded.is_ok());
assert_eq!(
AsvoSubmitJobResponse::ErrorWithCode {
error_code: 0,
error: "Your job cannot be submitted as there is a full outage in progress."
.to_string(),
},
decoded.unwrap()
);
}

#[test]
fn test_json_job_submit_response_full_or_partial_outage2() {
let json = "{\"error\": \"Your job cannot be submitted as there is a partial outage, please use a delivery location other than acacia.\", \"error_code\": 0}";
let decoded = serde_json::from_str::<AsvoSubmitJobResponse>(json);
assert!(decoded.is_ok());
assert_eq!(
AsvoSubmitJobResponse::ErrorWithCode {
error_code: 0,
error: "Your job cannot be submitted as there is a partial outage, please use a delivery location other than acacia."
.to_string(),
},
decoded.unwrap()
);
}

#[test]
fn test_json_job_submit_response_full_or_partial_outage3() {
let json = "{\"error\": \"Your job cannot be submitted as the staging server is down and also acacia is unavailable!\", \"error_code\": 0}";
let decoded = serde_json::from_str::<AsvoSubmitJobResponse>(json);
assert!(decoded.is_ok());
assert_eq!(
AsvoSubmitJobResponse::ErrorWithCode {
error_code: 0,
error: "Your job cannot be submitted as the staging server is down and also acacia is unavailable!"
.to_string(),
},
decoded.unwrap()
);
}
}
97 changes: 57 additions & 40 deletions src/asvo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::path::Path;
use std::time::Instant;

use backoff::{retry, Error, ExponentialBackoff};
use log::{debug, info};
use log::{debug, error, info, warn};
use reqwest::blocking::{Client, ClientBuilder};
use sha1::{Digest, Sha1};
use tar::Archive;
Expand Down Expand Up @@ -356,7 +356,7 @@ impl AsvoClient {
delivery: Delivery,
delivery_format: Option<DeliveryFormat>,
allow_resubmit: bool,
) -> Result<AsvoJobID, AsvoError> {
) -> Result<Option<AsvoJobID>, AsvoError> {
debug!("Submitting a vis job to ASVO");

let obsid_str = format!("{}", obsid);
Expand Down Expand Up @@ -386,7 +386,7 @@ impl AsvoClient {
offset: i32,
duration: i32,
allow_resubmit: bool,
) -> Result<AsvoJobID, AsvoError> {
) -> Result<Option<AsvoJobID>, AsvoError> {
debug!("Submitting a voltage job to ASVO");

let obsid_str = format!("{}", obsid);
Expand All @@ -413,7 +413,7 @@ impl AsvoClient {
delivery_format: Option<DeliveryFormat>,
parameters: &BTreeMap<&str, &str>,
allow_resubmit: bool,
) -> Result<AsvoJobID, AsvoError> {
) -> Result<Option<AsvoJobID>, AsvoError> {
debug!("Submitting a conversion job to ASVO");

let obsid_str = format!("{}", obsid);
Expand Down Expand Up @@ -454,7 +454,7 @@ impl AsvoClient {
delivery: Delivery,
delivery_format: Option<DeliveryFormat>,
allow_resubmit: bool,
) -> Result<AsvoJobID, AsvoError> {
) -> Result<Option<AsvoJobID>, AsvoError> {
debug!("Submitting a metafits job to ASVO");

let obsid_str = format!("{}", obsid);
Expand All @@ -477,11 +477,15 @@ impl AsvoClient {
}

/// This low-level function actually submits jobs to the ASVO.
/// The return can either be:
/// Ok(Some(jobid)) - this is when a new job is submitted
/// Ok(None) - this is when an existing job is resubmitted
/// Err() - this is when we hit an error
fn submit_asvo_job(
&self,
job_type: &AsvoJobType,
form: BTreeMap<&str, &str>,
) -> Result<AsvoJobID, AsvoError> {
) -> Result<Option<AsvoJobID>, AsvoError> {
debug!("Submitting an ASVO job");
let api_path = match job_type {
AsvoJobType::Conversion => "conversion_job",
Expand All @@ -496,47 +500,60 @@ impl AsvoClient {
.post(format!("{}/api/{}", get_asvo_server_address(), api_path))
.form(&form)
.send()?;
if !response.status().is_success() {
return Err(AsvoError::BadStatus {
code: response.status(),
message: response.text()?,
});
}
let response_text = response.text()?;
match serde_json::from_str(&response_text) {
Ok(AsvoSubmitJobResponse::JobID { job_id, .. }) => Ok(job_id),

Ok(AsvoSubmitJobResponse::ErrorWithCode { error_code, error }) => {
Err(AsvoError::BadRequest {
code: error_code,
message: error,
})
let code = response.status().as_u16();
let response_text = &response.text()?;
if code != 200 && code < 400 && code > 499 {
// Show the http code when it's not something we can handle
warn!("http code: {} response: {}", code, &response_text)
};
match serde_json::from_str(response_text) {
Ok(AsvoSubmitJobResponse::JobIDWithError {
error,
error_code,
job_id,
..
}) => {
if error_code == 2 {
// error code 2 == job already exists
warn!("{}. Job Id: {}", error.as_str(), job_id);
Ok(None)
} else {
Err(AsvoError::BadRequest {
code: error_code,
message: error,
})
}
}

Ok(AsvoSubmitJobResponse::GenericError { error }) => match error.as_str() {
// If the server comes back with the error "already queued,
// processing or complete", proceed like it wasn't an error.
"Job already queued, processing or complete." => {
let jobs = self.get_jobs()?;
// This approach is flawed; the first job ID with the
// same obsid as that submitted by this function is
// returned, but it's not necessarily the right job ID.
let j = jobs
.0
.iter()
.find(|j| j.obsid == form["obs_id"].parse().unwrap())
.unwrap();
Ok(j.jobid)
Ok(AsvoSubmitJobResponse::JobID { job_id, .. }) => Ok(Some(job_id)),

Ok(AsvoSubmitJobResponse::ErrorWithCode { error_code, error }) => {
// Crazy code here as MWA ASVO API does not have good error codes (yet!)
// 0 == invalid input (most of the time!)
if error_code == 0
&& (error.as_str()
== "Unable to submit job. Observation has no files to download."
|| (error.as_str().starts_with("Observation ")
&& error.as_str().ends_with(" does not exist")))
{
error!("{}", error.as_str());
Ok(None)
} else {
Err(AsvoError::BadRequest {
code: error_code,
message: error,
})
}
}

_ => Err(AsvoError::BadRequest {
code: 666,
message: error,
}),
},
Ok(AsvoSubmitJobResponse::GenericError { error }) => Err(AsvoError::BadRequest {
code: 999,
message: error,
}),

Err(e) => {
debug!("bad response: {}", response_text);
warn!("bad response: {}", response_text);
Err(AsvoError::BadJson(e))
}
}
Expand Down
65 changes: 53 additions & 12 deletions src/bin/giant-squid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,24 @@ fn main() -> Result<(), anyhow::Error> {
} else {
let client = AsvoClient::new()?;
let mut jobids: Vec<AsvoJobID> = Vec::with_capacity(obsids.len());
let mut submitted_count = 0;

for o in parsed_obsids {
let j = client.submit_vis(o, delivery, delivery_format, allow_resubmit)?;
info!("Submitted {} as ASVO job ID {}", o, j);
jobids.push(j);

if j.is_some() {
let jobid = j.unwrap();
info!("Submitted {} as ASVO job ID {}", o, jobid);
jobids.push(jobid);
submitted_count += 1;
}
// for the none case- the "submit_asvo" function
// will have already provided user some feedback
}
info!("Submitted {} obsids for visibility download.", obsids.len());
info!(
"Submitted {} obsids for visibility download.",
submitted_count
);

if wait {
// Endlessly loop over the newly-supplied job IDs until
Expand Down Expand Up @@ -549,6 +561,8 @@ fn main() -> Result<(), anyhow::Error> {
} else {
let client = AsvoClient::new()?;
let mut jobids: Vec<AsvoJobID> = Vec::with_capacity(obsids.len());
let mut submitted_count = 0;

for o in parsed_obsids {
let j = client.submit_conv(
o,
Expand All @@ -557,10 +571,17 @@ fn main() -> Result<(), anyhow::Error> {
&params,
allow_resubmit,
)?;
info!("Submitted {} as ASVO job ID {}", o, j);
jobids.push(j);

if j.is_some() {
let jobid = j.unwrap();
info!("Submitted {} as ASVO job ID {}", o, jobid);
jobids.push(jobid);
submitted_count += 1;
}
// for the none case- the "submit_asvo" function
// will have already provided user some feedback
}
info!("Submitted {} obsids for conversion.", obsids.len());
info!("Submitted {} obsids for conversion.", submitted_count);

if wait {
// Endlessly loop over the newly-supplied job IDs until
Expand Down Expand Up @@ -607,12 +628,23 @@ fn main() -> Result<(), anyhow::Error> {
} else {
let client = AsvoClient::new()?;
let mut jobids: Vec<AsvoJobID> = Vec::with_capacity(obsids.len());

let mut submitted_count = 0;
for o in parsed_obsids {
let j = client.submit_meta(o, delivery, delivery_format, allow_resubmit)?;
info!("Submitted {} as ASVO job ID {}", o, j);
jobids.push(j);
if j.is_some() {
let jobid = j.unwrap();
info!("Submitted {} as ASVO job ID {}", o, jobid);
jobids.push(jobid);
submitted_count += 1;
}
// for the none case- the "submit_asvo" function
// will have already provided user some feedback
}
info!("Submitted {} obsids for metadata download.", obsids.len());
info!(
"Submitted {} obsids for metadata download.",
submitted_count
);

if wait {
// Endlessly loop over the newly-supplied job IDs until
Expand Down Expand Up @@ -656,12 +688,21 @@ fn main() -> Result<(), anyhow::Error> {
} else {
let client = AsvoClient::new()?;
let mut jobids: Vec<AsvoJobID> = Vec::with_capacity(obsids.len());
let mut submitted_count = 0;

for o in parsed_obsids {
let j = client.submit_volt(o, delivery, offset, duration, allow_resubmit)?;
info!("Submitted {} as ASVO job ID {}", o, j);
jobids.push(j);

if j.is_some() {
let jobid = j.unwrap();
info!("Submitted {} as ASVO job ID {}", o, jobid);
jobids.push(jobid);
submitted_count += 1;
}
// for the none case- the "submit_asvo" function
// will have already provided user some feedback
}
info!("Submitted {} obsids for voltage download.", obsids.len());
info!("Submitted {} obsids for voltage download.", submitted_count);

if wait {
// Endlessly loop over the newly-supplied job IDs until
Expand Down

0 comments on commit 774111f

Please sign in to comment.