Skip to content

Commit

Permalink
Merge pull request #34 from bgpkit/module-refactor
Browse files Browse the repository at this point in the history
crate dedicated module for remote and utils
  • Loading branch information
digizeph authored Dec 18, 2023
2 parents b77a540 + 9e1907f commit ec80e02
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 213 deletions.
2 changes: 1 addition & 1 deletion src/bin/oneio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
Commands::UploadToS3 { s3_bucket, s3_path } => {
if let Err(e) = oneio::s3_env_check() {
eprintln!("missing s3 credentials");
eprintln!("{}", e.to_string());
eprintln!("{}", e);
exit(1);
}
match oneio::s3_upload(s3_bucket.as_str(), s3_path.as_str(), path) {
Expand Down
12 changes: 2 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,7 @@ mod oneio;
pub use error::OneIoError;

#[cfg(feature = "remote")]
pub use crate::oneio::download;
pub use crate::oneio::get_cache_reader;
pub use crate::oneio::get_reader;
#[cfg(feature = "remote")]
pub use crate::oneio::get_remote_reader;
pub use crate::oneio::get_writer;
#[cfg(feature = "json")]
pub use crate::oneio::read_json_struct;
pub use crate::oneio::read_lines;
pub use crate::oneio::read_to_string;
pub use crate::oneio::remote::*;
#[cfg(feature = "s3")]
pub use crate::oneio::s3::*;
pub use crate::oneio::*;
2 changes: 1 addition & 1 deletion src/oneio/compressions/bzip2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::oneio::OneIOCompression;
use crate::oneio::compressions::OneIOCompression;
use crate::OneIoError;
use bzip2::read::BzDecoder;
use bzip2::write::BzEncoder;
Expand Down
2 changes: 1 addition & 1 deletion src/oneio/compressions/gzip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::oneio::OneIOCompression;
use crate::oneio::compressions::OneIOCompression;
use crate::OneIoError;
use libflate::finish::AutoFinishUnchecked;
use libflate::gzip::Decoder;
Expand Down
2 changes: 1 addition & 1 deletion src/oneio/compressions/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::oneio::OneIOCompression;
use crate::oneio::compressions::OneIOCompression;
use crate::OneIoError;
use lz4::Decoder;
use std::fs::File;
Expand Down
9 changes: 9 additions & 0 deletions src/oneio/compressions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use crate::OneIoError;
use std::fs::File;
use std::io::{BufWriter, Read, Write};

#[cfg(feature = "bz")]
pub(crate) mod bzip2;
#[cfg(feature = "gz")]
Expand All @@ -6,3 +10,8 @@ pub(crate) mod gzip;
pub(crate) mod lz4;
#[cfg(feature = "xz")]
pub(crate) mod xz;

pub trait OneIOCompression {
fn get_reader(raw_reader: Box<dyn Read + Send>) -> Result<Box<dyn Read + Send>, OneIoError>;
fn get_writer(raw_writer: BufWriter<File>) -> Result<Box<dyn Write>, OneIoError>;
}
2 changes: 1 addition & 1 deletion src/oneio/compressions/xz.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::oneio::OneIOCompression;
use crate::oneio::compressions::OneIOCompression;
use crate::OneIoError;
use std::fs::File;
use std::io::{BufWriter, Read, Write};
Expand Down
240 changes: 59 additions & 181 deletions src/oneio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,191 +1,59 @@
#[cfg(feature = "compressions")]
mod compressions;
#[cfg(feature = "compressions")]
use crate::oneio::compressions::OneIOCompression;
#[cfg(feature = "remote")]
pub mod remote;
#[cfg(feature = "s3")]
pub mod s3;
pub mod utils;

pub use utils::*;

use crate::OneIoError;

#[cfg(feature = "remote")]
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Lines, Read, Write};
use std::io::{BufWriter, Read, Write};
use std::path::Path;

pub trait OneIOCompression {
fn get_reader(raw_reader: Box<dyn Read + Send>) -> Result<Box<dyn Read + Send>, OneIoError>;
fn get_writer(raw_writer: BufWriter<File>) -> Result<Box<dyn Write>, OneIoError>;
}

#[cfg(feature = "remote")]
fn get_protocol(path: &str) -> Option<String> {
let parts = path.split("://").collect::<Vec<&str>>();
if parts.len() < 2 {
return None;
fn get_writer_raw(path: &str) -> Result<BufWriter<File>, OneIoError> {
let path = Path::new(path);
if let Some(prefix) = path.parent() {
std::fs::create_dir_all(prefix)?;
}
Some(parts[0].to_string())
let output_file = BufWriter::new(File::create(path)?);
Ok(output_file)
}

fn get_reader_raw(path: &str) -> Result<Box<dyn Read + Send>, OneIoError> {
#[cfg(feature = "remote")]
let raw_reader: Box<dyn Read + Send> = match get_protocol(path) {
Some(protocol) => match protocol.as_str() {
"http" | "https" => {
let response = get_remote_http_raw(path, HashMap::new())?;
Box::new(response)
}
"ftp" => {
let response = get_remote_ftp_raw(path)?;
Box::new(response)
}
#[cfg(feature = "s3")]
"s3" => {
let (bucket, path) = s3::s3_url_parse(path)?;
Box::new(s3::s3_reader(bucket.as_str(), path.as_str())?)
}
_ => {
return Err(OneIoError::NotSupported(path.to_string()));
}
},
None => Box::new(std::fs::File::open(path)?),
};
let raw_reader: Box<dyn Read + Send> = remote::get_reader_raw_remote(path)?;
#[cfg(not(feature = "remote"))]
let raw_reader: Box<dyn Read + Send> = Box::new(std::fs::File::open(path)?);
Ok(raw_reader)
}

#[cfg(feature = "remote")]
fn get_remote_http_raw(
path: &str,
header: HashMap<String, String>,
) -> Result<reqwest::blocking::Response, OneIoError> {
let mut headers: reqwest::header::HeaderMap = (&header).try_into().expect("invalid headers");
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static("oneio"),
);
#[cfg(feature = "no-cache")]
headers.insert(
reqwest::header::CACHE_CONTROL,
reqwest::header::HeaderValue::from_static("no-cache"),
);
let client = reqwest::blocking::Client::builder()
.default_headers(headers)
.build()?;
let res = client
.execute(client.get(path).build()?)?
.error_for_status()?;
Ok(res)
}

#[cfg(feature = "remote")]
fn get_remote_ftp_raw(path: &str) -> Result<Box<dyn Read + Send>, OneIoError> {
if !path.starts_with("ftp://") {
return Err(OneIoError::NotSupported(path.to_string()));
}

let parts = path.split('/').collect::<Vec<&str>>();
let socket = match parts[2].contains(':') {
true => parts[2].to_string(),
false => format!("{}:21", parts[2]),
};
let path = parts[3..].join("/");

let mut ftp_stream = suppaftp::FtpStream::connect(socket)?;
ftp_stream.login("anonymous", "oneio").unwrap();
ftp_stream.transfer_type(suppaftp::types::FileType::Binary)?;
let reader = Box::new(ftp_stream.retr_as_stream(path.as_str())?);
Ok(reader)
}

#[cfg(feature = "remote")]
/// get a reader for remote content with the capability to specify headers.
/// Gets a reader for the given file path.
///
/// # Arguments
///
/// * `path` - The path of the file to read.
///
/// # Returns
///
/// A `Result` containing a boxed `Read+Sync` trait object with the file reader, or `OneIoError` if an error occurs.
///
/// # Examples
///
/// Example usage:
/// ```no_run
/// use std::collections::HashMap;
/// let mut reader = oneio::get_remote_reader(
/// "https://SOME_REMOTE_RESOURCE_PROTECTED_BY_ACCESS_TOKEN",
/// HashMap::from([("X-Custom-Auth-Key".to_string(), "TOKEN".to_string())])
/// ).unwrap();
/// let mut text = "".to_string();
/// reader.read_to_string(&mut text).unwrap();
/// println!("{}", text);
/// use std::io::Read;
/// use oneio::get_reader;
///
/// let mut reader = get_reader("file.txt").unwrap();
/// let mut buffer = Vec::new();
/// reader.read_to_end(&mut buffer).unwrap();
/// println!("{}", String::from_utf8_lossy(&buffer));
/// ```
pub fn get_remote_reader(
path: &str,
header: HashMap<String, String>,
) -> Result<Box<dyn Read + Send>, OneIoError> {
let raw_reader: Box<dyn Read + Send> = Box::new(get_remote_http_raw(path, header)?);
let file_type = *path.split('.').collect::<Vec<&str>>().last().unwrap();
match file_type {
#[cfg(feature = "gz")]
"gz" | "gzip" => compressions::gzip::OneIOGzip::get_reader(raw_reader),
#[cfg(feature = "bz")]
"bz2" | "bz" => compressions::bzip2::OneIOBzip2::get_reader(raw_reader),
#[cfg(feature = "lz4")]
"lz4" | "lz" => compressions::lz4::OneIOLz4::get_reader(raw_reader),
#[cfg(feature = "xz")]
"xz" | "xz2" | "lzma" => compressions::xz::OneIOXz::get_reader(raw_reader),
_ => {
// unknown file type of file {}. try to read as uncompressed file
Ok(Box::new(raw_reader))
}
}
}

#[cfg(feature = "remote")]
pub fn download(
remote_path: &str,
local_path: &str,
header: Option<HashMap<String, String>>,
) -> Result<(), OneIoError> {
let prefix = remote_path.split("://").collect::<Vec<&str>>()[0];
match prefix {
"http" | "https" => {
let mut writer = get_writer_raw(local_path)?;
let mut response = get_remote_http_raw(remote_path, header.unwrap_or_default())?;
response.copy_to(&mut writer)?;
}
"ftp" => {
let mut writer = get_writer_raw(local_path)?;
let mut reader = get_remote_ftp_raw(remote_path)?;
std::io::copy(&mut reader, &mut writer)?;
}
#[cfg(feature = "s3")]
"s3" => {
let (bucket, path) = s3::s3_url_parse(remote_path)?;
s3::s3_download(bucket.as_str(), path.as_str(), local_path)?;
}
_ => {
return Err(OneIoError::NotSupported(remote_path.to_string()));
}
}
Ok(())
}

/// Convenient function to directly read remote or local content to a String
pub fn read_to_string(path: &str) -> Result<String, OneIoError> {
let mut reader = get_reader(path)?;
let mut content = String::new();
reader.read_to_string(&mut content)?;
Ok(content)
}

#[cfg(feature = "json")]
/// Convenient function to directly read remote or local JSON content to a struct
pub fn read_json_struct<T: serde::de::DeserializeOwned>(path: &str) -> Result<T, OneIoError> {
let reader = get_reader(path)?;
let res: T = serde_json::from_reader(reader)?;
Ok(res)
}

/// convenient function to read a file and returns a line iterator.
pub fn read_lines(path: &str) -> Result<Lines<BufReader<Box<dyn Read + Send>>>, OneIoError> {
let reader = get_reader(path)?;
let buf_reader = BufReader::new(reader);
Ok(buf_reader.lines())
}

/// get a generic Box<dyn Read> reader
pub fn get_reader(path: &str) -> Result<Box<dyn Read + Send>, OneIoError> {
// get raw bytes reader
let raw_reader = get_reader_raw(path)?;
Expand Down Expand Up @@ -233,16 +101,14 @@ pub fn get_cache_reader(
}
}

let cache_file_name = match cache_file_name {
None => path
.split('/')
let cache_file_name = cache_file_name.unwrap_or_else(|| {
path.split('/')
.collect::<Vec<&str>>()
.into_iter()
.last()
.unwrap()
.to_string(),
Some(p) => p,
};
.to_string()
});

let cache_file_path = format!("{}/{}", cache_dir, cache_file_name);

Expand All @@ -263,15 +129,27 @@ pub fn get_cache_reader(
get_reader(cache_file_path.as_str())
}

fn get_writer_raw(path: &str) -> Result<BufWriter<File>, OneIoError> {
let path = Path::new(path);
if let Some(prefix) = path.parent() {
std::fs::create_dir_all(prefix)?;
}
let output_file = BufWriter::new(File::create(path)?);
Ok(output_file)
}

/// Returns a writer for the given file path with the corresponding compression.
///
/// # Arguments
///
/// * `path` - A string slice representing the file path.
///
/// # Returns
///
/// * `Result<Box<dyn Write>, OneIoError>` - A result containing a boxed writer trait object or an error.
///
/// # Examples
///
/// ```rust,no_run
/// use std::io::{self, Write};
/// use oneio::get_writer;
///
/// let writer = match get_writer("output.txt") {
/// Ok(writer) => writer,
/// Err(error) => panic!("Failed to create writer: {:?}", error),
/// };
/// ```
pub fn get_writer(path: &str) -> Result<Box<dyn Write>, OneIoError> {
let output_file = BufWriter::new(File::create(path)?);

Expand Down
Loading

0 comments on commit ec80e02

Please sign in to comment.