diff --git a/.gitignore b/.gitignore index 43334eb..6117870 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,6 @@ surrealml/rust_surrealml.cpython-310-darwin.so ./modules/core/model_stash/ ./modules/pipelines/runners/integrated_training_runner/run_env/ ./modules/pipelines/runners/batch_training_runner/run_env/ -./modules/pipelines/data_access/target/ \ No newline at end of file +./modules/pipelines/data_access/target/ +./modules/pipelines/runners/integrated_training_runner/run_env/ +modules/pipelines/runners/integrated_training_runner/run_env/ \ No newline at end of file diff --git a/modules/core/model_stash/sklearn/onnx/linear.onnx b/modules/core/model_stash/sklearn/onnx/linear.onnx new file mode 100644 index 0000000..bc1dbf6 Binary files /dev/null and b/modules/core/model_stash/sklearn/onnx/linear.onnx differ diff --git a/modules/core/model_stash/sklearn/surml/linear.surml b/modules/core/model_stash/sklearn/surml/linear.surml new file mode 100644 index 0000000..03e8041 Binary files /dev/null and b/modules/core/model_stash/sklearn/surml/linear.surml differ diff --git a/modules/core/model_stash/torch/surml/linear.surml b/modules/core/model_stash/torch/surml/linear.surml new file mode 100644 index 0000000..8094df3 Binary files /dev/null and b/modules/core/model_stash/torch/surml/linear.surml differ diff --git a/modules/pipelines/data_access/README.md b/modules/pipelines/data_access/README.md index 248d7d6..dea3247 100644 --- a/modules/pipelines/data_access/README.md +++ b/modules/pipelines/data_access/README.md @@ -88,6 +88,7 @@ can be seen in the file `engines/pytorch_train/tests/test_numpy_quality_control. If you just want to use the raw rust binary for ML training, you can directly call the rust binary that loads the images, and pipe this data into the python pytorch engine as seen in the following example: + ```bash ./data_access_rust_bin | python pytorch_engine.py ``` @@ -147,7 +148,7 @@ fn main() -> std::io::Result<()> { ``` -# Local Test Setup +# Local Test Setup for FFmpeg ## SRT listener server in OBS Studio @@ -164,19 +165,12 @@ srt://127.0.0.1:9999?mode=listener&timeout=500000&transtype=live Output resolution is set to 1920 X 1080 with an FPS of 1. The images in the [CAMMA-public/cholect50](https://github.com/CAMMA-public/cholect50) are sourced as an Image Slide Show. -## FFmpeg SRT caller server in Windows Powershell +The listener listens for connection requests from callers. The callers are implemented in `srt_receiver.rs`. -FFmpeg is installed and used in Windows Powershell. We tested our SRT caller server implementation in Powershell, using the command: +## Download and Install FFmpeg -```powershell -ffmpeg -i srt://127.0.0.1:9999?mode=caller -f image2 -vcodec mjpeg -q:v 5 output%03d.jpg -``` +Download FFmpeg [here](https://www.ffmpeg.org/download.html). Versions are available for Windows, Linux, and Mac OS. -- **'127.0.0.1':** Destination IP address the caller calls -- **'9999':** Destination port the caller calls -- **'-f image2':** Set FFmpeg output image format to *image2* -- **'-vcodec mjpeg':** Set FFmpeg output image codec to mjpeg -- **'-q:v 5':** Sets the quality of the jpg images to 5 (2-31) -- **'output%03d.jpg':** Sequentially incrementing file name, e.g. output001.jpg +## FFmpeg Documentation -The above command saves the SRT stream as a sequence of .jpg images. Through this process, we confirm this experimental setup with OBS Studio, FFmpeg, and SRT works in CLI. +Official documentation of FFmpeg is [here](https://www.ffmpeg.org/documentation.html). diff --git a/modules/pipelines/data_access/basic/Cargo.toml b/modules/pipelines/data_access/basic/Cargo.toml new file mode 100644 index 0000000..8575787 --- /dev/null +++ b/modules/pipelines/data_access/basic/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "basic" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +image = "0.24.8" diff --git a/modules/pipelines/data_access/basic/src/bin_pack.rs b/modules/pipelines/data_access/basic/src/bin_pack.rs new file mode 100644 index 0000000..e69de29 diff --git a/modules/pipelines/data_access/basic/src/images.rs b/modules/pipelines/data_access/basic/src/images.rs new file mode 100644 index 0000000..f8f3bbc --- /dev/null +++ b/modules/pipelines/data_access/basic/src/images.rs @@ -0,0 +1,177 @@ +//! # Image Buffer +//! In this module we are reading the image to a buffer and calculate the RGB indexes for each pixel. +use image::io::Reader as ImageReader; +use image::{ImageBuffer, Rgb, DynamicImage}; +use crate::tags::SurgeryStep; +use std::io::{self, Write}; + + +/// Represents the indexes of the red, green and blue components of a pixel. +/// +/// # Fields +/// * `red` - The index of the red component in relation to the (x, y) coordinates of the pixel. +/// * `green` - The index of the green component in relation to the (x, y) coordinates of the pixel. +/// * `blue` - The index of the blue component in relation to the (x, y) coordinates of the pixel. +#[derive(Debug)] +struct RgbIndexes { + red: usize, + green: usize, + blue: usize, +} + + +/// Calculates the RGB indexes for a given pixel in relation to the (x, y) coordinates of the pixel. +/// +/// # Arguments +/// * `x` - The x coordinate of the pixel. +/// * `y` - The y coordinate of the pixel. +/// * `total_width` - The total width of the image frame. +/// * `total_height` - The total height of the image frame. +/// +/// # Returns +/// A RgbIndexes struct containing the indexes of the red, green and blue components of the pixel. +fn calculate_rgb_index(x: usize, y: usize, total_width: usize, total_height: usize) -> RgbIndexes { + RgbIndexes { + red: 0 * total_height * total_width + y * total_width + x, + green: 1 * total_height * total_width + y * total_width + x, + blue: 2 * total_height * total_width + y * total_width + x, + } +} + + +/// Reads an RGB image from a file and returns the raw data in 1D form that can be mapped as a 3D +/// array by using the `calculate_rgb_index` function. +/// +/// # Arguments +/// * `path` - The path to the image file. +/// * `height` - The total height of the image. +/// * `width` - The total width of the image. +/// +/// # Returns +/// A 1D array containing the raw RGB data of the image (flatterned). +pub fn read_rgb_image(path: String, height: usize, width: usize) -> Vec { + // let height: usize = 480; + // let width: usize = 853; + let depth: usize = 3; + + let img: DynamicImage = ImageReader::open(path).unwrap().decode().unwrap(); + let resized_img: DynamicImage = img.resize_exact(width as u32, height as u32, image::imageops::FilterType::Nearest); + + // Convert to RGB and flatten to array if necessary + let rgb_img: ImageBuffer, Vec> = resized_img.to_rgb8(); + + let mut raw_data = vec![0u8; depth * height * width]; // 3 channels, 480 height, 853 width + + for chunk in rgb_img.enumerate_pixels() { + let x: u32 = chunk.0; + let y: u32 = chunk.1; + let pixel: &Rgb = chunk.2; // [u8, u8, u8] + + let indexes = calculate_rgb_index(x as usize, y as usize, width, height); + + raw_data[indexes.red as usize] = pixel[0]; // store red component + raw_data[indexes.green as usize] = pixel[1]; // store green component + raw_data[indexes.blue as usize] = pixel[2]; // store blue component + } + raw_data +} + + +/// Writes a frame to the standard output. +/// +/// # Arguments +/// * `data` - The raw data of the frame. +/// * `tag` - The tag associated with the frame. +pub fn write_frame_to_std_out(data: Vec, tag: SurgeryStep) { + let stdout = io::stdout(); + let mut handle = stdout.lock(); + + // Write the tag as a 2-byte integer + handle.write_all(&(tag.to_u8() as u16).to_le_bytes()).unwrap(); + + // Write the len as a 4-byte integer + handle.write_all(&(data.len() as u32).to_le_bytes()).unwrap(); + + // Write each byte in data as a 2-byte integer + for byte in data { + handle.write_all(&(byte as u16).to_le_bytes()).unwrap(); + } + + handle.flush().unwrap(); +} + + +#[cfg(test)] +mod tests { + + use super::*; + use serde::Deserialize; + + #[derive(Debug, Deserialize)] + struct DummyJson { + data: Vec, + } + + #[test] + fn test_read_image() { + let _data = read_rgb_image("../data_stash/images/169_6300.jpg".to_string(), 480, 853); + } + + #[test] + fn test_calculate_rgb_index() { + + // This will give x y chunks of 50 and an entire rgb image of 150 + let total_height = 5; + let total_width = 10; + + let indexes = calculate_rgb_index(0, 0, total_width, total_height); + assert_eq!(&0, &indexes.red); + assert_eq!(&50, &indexes.green); + assert_eq!(&100, &indexes.blue); + + let indexes = calculate_rgb_index(1, 0, total_width, total_height); + assert_eq!(&1, &indexes.red); + assert_eq!(&51, &indexes.green); + assert_eq!(&101, &indexes.blue); + + let indexes = calculate_rgb_index(2, 0, total_width, total_height); + assert_eq!(&2, &indexes.red); + assert_eq!(&52, &indexes.green); + assert_eq!(&102, &indexes.blue); + + let indexes = calculate_rgb_index(0, 1, total_width, total_height); + assert_eq!(&10, &indexes.red); + assert_eq!(&60, &indexes.green); + assert_eq!(&110, &indexes.blue); + + let indexes = calculate_rgb_index(0, 2, total_width, total_height); + assert_eq!(&20, &indexes.red); + assert_eq!(&70, &indexes.green); + assert_eq!(&120, &indexes.blue); + } + + #[test] + fn test_test_calculate_rgb_index_quality_control() { + let raw_data = std::fs::read_to_string("../data_stash/images/dummy_rgb_data.json").unwrap(); + let data: DummyJson = serde_json::from_str(&raw_data).unwrap(); + + // This will give x y chunks of 50 and an entire rgb image of 150 + let total_height = 5; + let total_width = 10; + + let index = calculate_rgb_index(0, 0, total_width, total_height); + assert_eq!(&data.data[index.red], &111); // z = 0 + assert_eq!(&data.data[index.green], &208); // z = 1 + assert_eq!(&data.data[index.blue], &12); // z = 2 + + let index = calculate_rgb_index(5, 3, total_width, total_height); + assert_eq!(&data.data[index.red], &65); + assert_eq!(&data.data[index.green], &7); + assert_eq!(&data.data[index.blue], &193); + + let index = calculate_rgb_index(8, 2, total_width, total_height); + assert_eq!(&data.data[index.red], &253); + assert_eq!(&data.data[index.green], &133); + assert_eq!(&data.data[index.blue], &115); + } +} diff --git a/modules/pipelines/data_access/basic/src/lib.rs b/modules/pipelines/data_access/basic/src/lib.rs new file mode 100644 index 0000000..19dbc2f --- /dev/null +++ b/modules/pipelines/data_access/basic/src/lib.rs @@ -0,0 +1,4 @@ +pub mod tags; +pub mod images; +pub mod bin_pack; +pub mod srt_receiver; diff --git a/modules/pipelines/data_access/basic/src/srt_receiver.rs b/modules/pipelines/data_access/basic/src/srt_receiver.rs new file mode 100644 index 0000000..60d3ccd --- /dev/null +++ b/modules/pipelines/data_access/basic/src/srt_receiver.rs @@ -0,0 +1,148 @@ +//! Live video stream receiver using SRT protocol and +//! Frame extraction from video file using FFmpeg. +use std::process::Command; + +/// Receives a live video stream using the SRT protocol and saves it to a file. +/// +/// # Arguments +/// * `input_url` - The streaming server URL. +/// * `output_file` - The path to save the received video. +fn receive_srt_stream(input_url: &str, output_file: &str) -> std::io::Result<()> { + let _output = Command::new("ffmpeg") + .args([ + "-i", input_url, // Input URL + "-c", "copy", // Copy the video and audio codecs, no need to re-encode + "-f", "mp4", // Output file format + output_file // Output file + ]) + .status()?; + + if _output.success() { + Ok(()) + } else { + Err(std::io::Error::new(std::io::ErrorKind::Other, "ffmpeg failed")) + } +} + +/// Converts a video file to frames using FFmpeg. +/// +/// # Arguments +/// * `video_file` - The path to input video file. +/// * `output_folder` - The directory to save the extracted frames. +fn convert_video_to_frames(video_file: &str, output_folder: &str) -> std::io::Result<()> { + let _output = Command::new("ffmpeg") + .args([ + "-i", video_file, // Input video file + "-vf", "fps=1", // Extract one frame per second + &format!("{}/out%d.png", output_folder) // Save frames as PNG images + ]) + .status()?; + + if _output.success() { + Ok(()) + } else { + Err(std::io::Error::new(std::io::ErrorKind::Other, "ffmpeg failed")) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::fs; + use image::io::Reader as ImageReader; + use image::{DynamicImage, GenericImageView}; + + #[test] + fn test_receive_srt_stream() { + /* + * This test requires a running SRT server to pass!! + */ + + // Define input and output file paths for testing + let input_url = "srt://127.0.0.1:9999?mode=caller"; + let output_file = "output.mp4"; + + // Call the function being tested + let result = receive_srt_stream(input_url, output_file); + + // Assert that the function returns Ok(()) + assert!(result.is_ok()); + + // Clean up the output file + std::fs::remove_file(output_file).expect("Failed to clean up the output file"); + } + + #[test] + fn test_convert_video_to_frames() { + /* + * This test use a dummy video generated with the stored example images + * The dummy video is converted to frames + * And the frames are compared with the original images + * + * TO-DO: ffmpeg only reads the first two images in the filelist.txt, fix this + */ + // Define input video file and output folder for testing + let output_folder = "../data_stash/images/frames"; + let dummy_video = "../data_stash/images/dummy_vid.mp4"; + let base_path = "../data_stash/images/169_6300.jpg"; + + // Create the output folder for testing + if fs::metadata(output_folder).is_err() { + fs::create_dir(output_folder).expect("Failed to create output folder"); + } + + // Generate a dummy video file for testing + let _output = Command::new("ffmpeg") + .args([ + "-f", "concat", // Use all images in the specified folder + "-safe", "0", // Use all images in the specified folder + "-i", "../data_stash/images/filelist.txt", // Use all images in the specified folder + "-c:v", "libx264", // Use the H.264 codec for video + "-crf", "0", // Use the H.264 codec for video + "-r", "1", // Set the output frame rate to 30 FPS + "-pix_fmt", "yuv420p", // Set the pixel format + dummy_video // Output file + ]) + .status() + .expect("Failed to generate video file"); + // fs::File::create(video_file).expect("Failed to create video file"); + + // Call the function being tested + let result = convert_video_to_frames(dummy_video, output_folder); + + fn load_image(path: &str) -> DynamicImage { + ImageReader::open(path) + .expect("Failed to open image.") + .decode() + .expect("Failed to decode image.") + } + + fn compare_images(img1: &DynamicImage, img2: &DynamicImage) -> bool { + if img1.dimensions() != img2.dimensions() { + println!("The images have different dimensions."); + return false; + } + + true + } + + let img1 = load_image(&base_path); + let img2 = load_image(&format!("{}/out1.png", output_folder)); + + if compare_images(&img1, &img2) { + println!("The images are identical in size."); + } else { + println!("The images are not identical in size."); + } + + // Assert that the function returns Ok(()) + assert!(result.is_ok()); + + // Clean up the video file and output folder + fs::remove_file(dummy_video).expect("Failed to clean up video file"); + // Mannually check whether the image and the frame are identical + // fs::remove_dir_all(output_folder).expect("Failed to clean up output folder"); + } + +} diff --git a/modules/pipelines/data_access/basic/src/tags.rs b/modules/pipelines/data_access/basic/src/tags.rs new file mode 100644 index 0000000..26852a0 --- /dev/null +++ b/modules/pipelines/data_access/basic/src/tags.rs @@ -0,0 +1,149 @@ +//! Defines a basic mechanism for reading tags from a file for a given video. +use std::fs::File; +use std::io::prelude::*; + +use serde_json::Value; +use std::collections::HashMap; + + +/// Represents the different tags that can be assigned to a video frame. +/// +/// # Variants +/// * `Preparation` - The first step of the surgery, where the patient is prepared for the surgery. +/// * `CarlotTriangleDissection` - The second step of the surgery, where the carlot triangle is dissected. +/// * `ClippingAndCutting` - Where clipping and cutting is happening in the video. +/// * `GallbladderDissection` - The fourth step of the surgery, where the gallbladder is dissected. +/// * `GallbladderPackaging` - The fifth step of the surgery, where the gallbladder is packaged. +/// * `CleaningAndCoagulation` - The sixth step of the surgery, where cleaning and coagulation is happening. +/// * `GallbladderExtraction` - The seventh step of the surgery, where the gallbladder is extracted. +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub enum SurgeryStep { + Preparation, + CarlotTriangleDissection, + ClippingAndCutting, + GallbladderDissection, + GallbladderPackaging, + CleaningAndCoagulation, + GallbladderExtraction, +} + + +/// Converts a u8 to a SurgeryStep. +impl From for SurgeryStep { + fn from(step: u8) -> Self { + match step { + 0 => SurgeryStep::Preparation, + 1 => SurgeryStep::CarlotTriangleDissection, + 2 => SurgeryStep::ClippingAndCutting, + 3 => SurgeryStep::GallbladderDissection, + 4 => SurgeryStep::GallbladderPackaging, + 5 => SurgeryStep::CleaningAndCoagulation, + 6 => SurgeryStep::GallbladderExtraction, + _ => panic!("Invalid step"), + } + } +} + + + +impl SurgeryStep { + + /// Converts the surgery step to a u8. + /// + /// # Returns + /// A u8 representing the surgery step. + pub fn to_u8(&self) -> u8 { + match self { + SurgeryStep::Preparation => 0, + SurgeryStep::CarlotTriangleDissection => 1, + SurgeryStep::ClippingAndCutting => 2, + SurgeryStep::GallbladderDissection => 3, + SurgeryStep::GallbladderPackaging => 4, + SurgeryStep::CleaningAndCoagulation => 5, + SurgeryStep::GallbladderExtraction => 6, + } + } + + /// Converts the surgery step to a binary string of 4 bytes. + /// + /// # Returns + /// A string containing the binary representation of the surgery step. + pub fn to_binary_string(&self) -> String { + format!("{:08b}", self.to_u8()) + } +} + + +/// Merely reads the tags from a file and returns them as a string. +/// +/// # Note +/// Right now we are loading all of the tags into memory. This is not very efficient +/// if the size of the file grows but this will work for now to test things. We can +/// chunk the file later if we need to for loading and processing. I would recommend +/// that we come up with our own binary format to store the tags in so that we can read +/// chunks more efficiently and reduce the size of the file. as I think these files will +/// grow in size quite a bit. +/// +/// # Arguments +/// * `path` - The path to the file containing the tags. +/// +/// # Returns +/// A string containing the tags. +pub fn read_tags(path: &str) -> Result { + let mut file = File::open(path)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + return Ok(contents) +} + + +/// Parses the tags in string format to a HashMap with surgical tags for indexes. +/// +/// # Note +/// Right now we are merely loading the tags and putting them into a HashMap which is +/// not very efficient as we need to perform hashes per insert or lookup. Instead we +/// need to have a vector that has a pointer to the tags. The index can be the index +/// of the vector removing the need for hashing. However, for now this is fine just to +/// get things working. If we keep the interfaces the same, we can easily change the +/// indexing mechanism later without breaking the rest of the program. +/// +/// # Arguments +/// * `data` - The string containing the tags. +/// +/// # Returns +/// A HashMap with the tags for each index of the video. +pub fn parse_surgery_steps(data: String) -> HashMap> { + let v: Value = serde_json::from_str(&data).expect("Invalid JSON"); + + let mut map = HashMap::new(); + + for (key, steps) in v.as_object().expect("Expected a JSON object") { + let steps_list = steps.as_array().expect("Expected an array") + .iter() + .map(|step| SurgeryStep::from(step.as_u64().expect("Expected an integer") as u8)) + .collect(); + + map.insert(key.to_string(), steps_list); + } + + map +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_tags() { + let tags = read_tags("../data_stash/cleaned_labels/VID68_processed.json").unwrap(); + let data = parse_surgery_steps(tags); + assert_eq!(SurgeryStep::ClippingAndCutting, data.get("968").unwrap()[0]); + } + + #[test] + fn test_surgery_step_to_binary_string() { + let step = SurgeryStep::GallbladderPackaging; + assert_eq!("00000100", step.to_binary_string()); + } +} diff --git a/modules/pipelines/data_access/data_stash/images/filelist.txt b/modules/pipelines/data_access/data_stash/images/filelist.txt new file mode 100644 index 0000000..350b585 --- /dev/null +++ b/modules/pipelines/data_access/data_stash/images/filelist.txt @@ -0,0 +1,2 @@ +file '169_6300.jpg' +file '169_6325.jpg' diff --git a/modules/pipelines/runners/basic_training_runner/Cargo.toml b/modules/pipelines/runners/basic_training_runner/Cargo.toml index 4d2bce7..b579487 100644 --- a/modules/pipelines/runners/basic_training_runner/Cargo.toml +++ b/modules/pipelines/runners/basic_training_runner/Cargo.toml @@ -7,4 +7,3 @@ edition = "2021" [dependencies] data-access-layer = { path = "../../data_access" } - diff --git a/modules/pipelines/runners/basic_training_runner/run_env/basic_training_runner b/modules/pipelines/runners/basic_training_runner/run_env/basic_training_runner deleted file mode 100755 index af3fbd7..0000000 Binary files a/modules/pipelines/runners/basic_training_runner/run_env/basic_training_runner and /dev/null differ diff --git a/modules/pipelines/runners/basic_training_runner/src/main.rs b/modules/pipelines/runners/basic_training_runner/src/main.rs index 8edc92b..7c6f4eb 100644 --- a/modules/pipelines/runners/basic_training_runner/src/main.rs +++ b/modules/pipelines/runners/basic_training_runner/src/main.rs @@ -1,3 +1,8 @@ +use basic::images::{ + read_rgb_image, + write_frame_to_std_out, +}; +use basic::tags::SurgeryStep; use data_access_layer::images::{ read_rgb_image, write_frame_to_std_out,