diff --git a/cameleon/src/u3v/stream_handle.rs b/cameleon/src/u3v/stream_handle.rs index 7fbae5d..e27556d 100644 --- a/cameleon/src/u3v/stream_handle.rs +++ b/cameleon/src/u3v/stream_handle.rs @@ -7,7 +7,7 @@ use std::{ convert::TryInto, sync::mpsc, - sync::{mpsc::TryRecvError, Arc, Mutex}, + sync::{mpsc::TryRecvError, Arc, Mutex, MutexGuard}, time::Duration, }; @@ -42,13 +42,47 @@ macro_rules! unwrap_or_poisoned { } impl StreamHandle { - pub(super) fn new(device: &u3v::Device) -> ControlResult> { - let inner = device.stream_channel()?; - Ok(inner.map(|inner| Self { - inner: Arc::new(Mutex::new(inner)), - params: StreamParams::default(), - cancellation_tx: None, - })) + /// Read leader of a stream packet. + /// + /// Buffer size must be equal or larger than [`StreamParams::leader_size`]. + pub fn read_leader<'a>(&self, buf: &'a mut [u8]) -> StreamResult> { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_leader( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } + } + + /// Read payload of a stream packet. + pub fn read_payload(&self, buf: &mut [u8]) -> StreamResult { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_payload( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } + } + + /// Read trailer of a stream packet. + /// + /// Buffer size must be equal of larger than [`StreamParams::trailer_size`]. + pub fn read_trailer<'a>(&self, buf: &'a mut [u8]) -> StreamResult> { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_trailer( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } } /// Return params. @@ -61,6 +95,15 @@ impl StreamHandle { pub fn params_mut(&mut self) -> &mut StreamParams { &mut self.params } + + pub(super) fn new(device: &u3v::Device) -> ControlResult> { + let inner = device.stream_channel()?; + Ok(inner.map(|inner| Self { + inner: Arc::new(Mutex::new(inner)), + params: StreamParams::default(), + cancellation_tx: None, + })) + } } impl PayloadStream for StreamHandle { @@ -162,9 +205,9 @@ impl StreamingLoop { let mut trailer_buf = vec![0; self.params.trailer_size]; let mut payload_buf_opt = None; let mut leader_buf = vec![0; self.params.leader_size]; - let inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); - 'outer: loop { + loop { // Stop the loop when // 1. `cancellation_tx` sends signal. // 2. `cancellation_tx` is dropped. @@ -189,62 +232,19 @@ impl StreamingLoop { }, }; - let mut async_pool = AsyncPool::new(&inner); - - if let Err(err) = read_leader(&mut async_pool, &self.params, &mut leader_buf) { - // Report and send error if the error is fatal. - if matches!(err, StreamError::Io(..) | StreamError::Disconnected) { - error!(?err); - self.sender.try_send(Err(err)).ok(); - } - payload_buf_opt = Some(payload_buf); - continue; - }; - - if let Err(err) = read_payload(&mut async_pool, &self.params, &mut payload_buf) { - warn!(?err); - // Reuse `payload_buf`. - payload_buf_opt = Some(payload_buf); - self.sender.try_send(Err(err)).ok(); - continue; - }; - - if let Err(err) = read_trailer(&mut async_pool, &self.params, &mut trailer_buf) { - warn!(?err); - // Reuse `payload_buf`. - payload_buf_opt = Some(payload_buf); - self.sender.try_send(Err(err)).ok(); - continue; - }; - - // We've submitted the bulk transfers, now wait for them. - let mut first_buf_len = None; - let mut last_buf_len = None; - let mut payload_len = 0; - - while !async_pool.is_empty() { - let len = match async_pool.poll(self.params.timeout) { - Ok(len) => len, - Err(err) => { - warn!(?err); - // Can't reuse `payload_buf` because we're in a loop. - self.sender.try_send(Err(err.into())).ok(); - continue 'outer; + let leader = match read_leader(&mut inner, &self.params, &mut leader_buf) { + Ok(leader) => leader, + Err(err) => { + // Report and send error if the error is fatal. + if matches!(err, StreamError::Io(..) | StreamError::Disconnected) { + error!(?err); + self.sender.try_send(Err(err)).ok(); } - }; - - if first_buf_len.is_none() { - first_buf_len = Some(len); - } else { - payload_len += len; + payload_buf_opt = Some(payload_buf); + continue; } + }; - last_buf_len = Some(len); - } - - let payload_len = payload_len - last_buf_len.unwrap(); - - // We received the data from the bulk transfers, try to parse stuff now. let leader = match u3v_stream::Leader::parse(&leader_buf) .map_err(|e| StreamError::InvalidPayload(format!("{}", e).into())) { @@ -258,6 +258,28 @@ impl StreamingLoop { } }; + let read_payload_size = match read_payload(&mut inner, &self.params, &mut payload_buf) { + Ok(size) => size, + Err(err) => { + warn!(?err); + // Reuse `payload_buf`. + payload_buf_opt = Some(payload_buf); + self.sender.try_send(Err(err)).ok(); + continue; + } + }; + + let trailer = match read_trailer(&mut inner, &self.params, &mut trailer_buf) { + Ok(trailer) => trailer, + Err(err) => { + warn!(?err); + // Reuse `payload_buf`. + payload_buf_opt = Some(payload_buf); + self.sender.try_send(Err(err)).ok(); + continue; + } + }; + let trailer = match u3v_stream::Trailer::parse(&trailer_buf) .map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into())) { @@ -274,7 +296,7 @@ impl StreamingLoop { let builder_result = PayloadBuilder { leader, payload_buf, - read_payload_size: payload_len, + read_payload_size: read_payload_size, trailer, } .build(); @@ -525,23 +547,24 @@ impl StreamParams { } } -fn read_leader( - async_pool: &mut AsyncPool, +fn read_leader<'a>( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, - buf: &mut [u8], -) -> StreamResult<()> { + buf: &'a mut [u8], +) -> StreamResult> { let leader_size = params.leader_size; - async_pool.submit(&mut buf[..leader_size])?; + recv(inner, params, buf, leader_size)?; - Ok(()) + u3v_stream::Leader::parse(buf).map_err(|e| StreamError::InvalidPayload(format!("{}", e).into())) } fn read_payload( - async_pool: &mut AsyncPool, + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, buf: &mut [u8], -) -> StreamResult<()> { +) -> StreamResult { let payload_size = params.payload_size; + let mut async_pool = AsyncPool::new(inner); let mut cursor = 0; for _ in 0..params.payload_count { async_pool.submit(&mut buf[cursor..cursor + payload_size])?; @@ -552,20 +575,45 @@ fn read_payload( async_pool.submit(&mut buf[cursor..cursor + params.payload_final1_size])?; cursor += params.payload_final1_size; } - if params.payload_final2_size != 0 { - async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?; + //if params.payload_final2_size != 0 { + // async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?; + //} + + let mut read_len = 0; + while !async_pool.is_empty() { + read_len += async_pool.poll(params.timeout)?; } - Ok(()) + Ok(read_len) } -fn read_trailer( - async_pool: &mut AsyncPool, +fn read_trailer<'a>( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, - buf: &mut [u8], -) -> StreamResult<()> { + buf: &'a mut [u8], +) -> StreamResult> { let trailer_size = params.trailer_size; - async_pool.submit(&mut buf[..trailer_size])?; + recv(inner, params, buf, trailer_size)?; + + u3v_stream::Trailer::parse(buf) + .map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into())) +} + +fn recv( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, + params: &StreamParams, + buf: &mut [u8], + len: usize, +) -> StreamResult { + if len == 0 { + return Ok(0); + } + + if buf.len() < len { + return Err(StreamError::BufferTooSmall); + } - Ok(()) + inner + .recv(&mut buf[..len], params.timeout) + .map_err(|e| e.into()) }