From 2ec1521dce8d1bdf634105de112cfc9cb50e50d4 Mon Sep 17 00:00:00 2001 From: Dmitry Mikushin Date: Sun, 3 Nov 2024 18:18:16 +0100 Subject: [PATCH] This reverts the following patch, in order to fix #196 commit a371076bdbdffa25a654d320fe5e1bbf958b2637 Author: Brian Schwind Date: Thu May 11 22:49:45 2023 +0900 Use async transfers for the leader and trailer buffers Performing leader, payload and trailer bulk transfers in one common async pool loop may change the order of buffers. So instead of leader-payload-trailer we may get trailer-leader-payload, which is currently not handled by the buffer parsing code. This patch could be re-applied later, acompanied by a proper order-independent handling. --- cameleon/src/u3v/stream_handle.rs | 210 ++++++++++++++++++------------ 1 file changed, 129 insertions(+), 81 deletions(-) diff --git a/cameleon/src/u3v/stream_handle.rs b/cameleon/src/u3v/stream_handle.rs index 7fbae5d1..e27556d2 100644 --- a/cameleon/src/u3v/stream_handle.rs +++ b/cameleon/src/u3v/stream_handle.rs @@ -7,7 +7,7 @@ use std::{ convert::TryInto, sync::mpsc, - sync::{mpsc::TryRecvError, Arc, Mutex}, + sync::{mpsc::TryRecvError, Arc, Mutex, MutexGuard}, time::Duration, }; @@ -42,13 +42,47 @@ macro_rules! unwrap_or_poisoned { } impl StreamHandle { - pub(super) fn new(device: &u3v::Device) -> ControlResult> { - let inner = device.stream_channel()?; - Ok(inner.map(|inner| Self { - inner: Arc::new(Mutex::new(inner)), - params: StreamParams::default(), - cancellation_tx: None, - })) + /// Read leader of a stream packet. + /// + /// Buffer size must be equal or larger than [`StreamParams::leader_size`]. + pub fn read_leader<'a>(&self, buf: &'a mut [u8]) -> StreamResult> { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_leader( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } + } + + /// Read payload of a stream packet. + pub fn read_payload(&self, buf: &mut [u8]) -> StreamResult { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_payload( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } + } + + /// Read trailer of a stream packet. + /// + /// Buffer size must be equal of larger than [`StreamParams::trailer_size`]. + pub fn read_trailer<'a>(&self, buf: &'a mut [u8]) -> StreamResult> { + if self.is_loop_running() { + Err(StreamError::InStreaming) + } else { + read_trailer( + &mut unwrap_or_poisoned!(self.inner.lock())?, + &self.params, + buf, + ) + } } /// Return params. @@ -61,6 +95,15 @@ impl StreamHandle { pub fn params_mut(&mut self) -> &mut StreamParams { &mut self.params } + + pub(super) fn new(device: &u3v::Device) -> ControlResult> { + let inner = device.stream_channel()?; + Ok(inner.map(|inner| Self { + inner: Arc::new(Mutex::new(inner)), + params: StreamParams::default(), + cancellation_tx: None, + })) + } } impl PayloadStream for StreamHandle { @@ -162,9 +205,9 @@ impl StreamingLoop { let mut trailer_buf = vec![0; self.params.trailer_size]; let mut payload_buf_opt = None; let mut leader_buf = vec![0; self.params.leader_size]; - let inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); - 'outer: loop { + loop { // Stop the loop when // 1. `cancellation_tx` sends signal. // 2. `cancellation_tx` is dropped. @@ -189,62 +232,19 @@ impl StreamingLoop { }, }; - let mut async_pool = AsyncPool::new(&inner); - - if let Err(err) = read_leader(&mut async_pool, &self.params, &mut leader_buf) { - // Report and send error if the error is fatal. - if matches!(err, StreamError::Io(..) | StreamError::Disconnected) { - error!(?err); - self.sender.try_send(Err(err)).ok(); - } - payload_buf_opt = Some(payload_buf); - continue; - }; - - if let Err(err) = read_payload(&mut async_pool, &self.params, &mut payload_buf) { - warn!(?err); - // Reuse `payload_buf`. - payload_buf_opt = Some(payload_buf); - self.sender.try_send(Err(err)).ok(); - continue; - }; - - if let Err(err) = read_trailer(&mut async_pool, &self.params, &mut trailer_buf) { - warn!(?err); - // Reuse `payload_buf`. - payload_buf_opt = Some(payload_buf); - self.sender.try_send(Err(err)).ok(); - continue; - }; - - // We've submitted the bulk transfers, now wait for them. - let mut first_buf_len = None; - let mut last_buf_len = None; - let mut payload_len = 0; - - while !async_pool.is_empty() { - let len = match async_pool.poll(self.params.timeout) { - Ok(len) => len, - Err(err) => { - warn!(?err); - // Can't reuse `payload_buf` because we're in a loop. - self.sender.try_send(Err(err.into())).ok(); - continue 'outer; + let leader = match read_leader(&mut inner, &self.params, &mut leader_buf) { + Ok(leader) => leader, + Err(err) => { + // Report and send error if the error is fatal. + if matches!(err, StreamError::Io(..) | StreamError::Disconnected) { + error!(?err); + self.sender.try_send(Err(err)).ok(); } - }; - - if first_buf_len.is_none() { - first_buf_len = Some(len); - } else { - payload_len += len; + payload_buf_opt = Some(payload_buf); + continue; } + }; - last_buf_len = Some(len); - } - - let payload_len = payload_len - last_buf_len.unwrap(); - - // We received the data from the bulk transfers, try to parse stuff now. let leader = match u3v_stream::Leader::parse(&leader_buf) .map_err(|e| StreamError::InvalidPayload(format!("{}", e).into())) { @@ -258,6 +258,28 @@ impl StreamingLoop { } }; + let read_payload_size = match read_payload(&mut inner, &self.params, &mut payload_buf) { + Ok(size) => size, + Err(err) => { + warn!(?err); + // Reuse `payload_buf`. + payload_buf_opt = Some(payload_buf); + self.sender.try_send(Err(err)).ok(); + continue; + } + }; + + let trailer = match read_trailer(&mut inner, &self.params, &mut trailer_buf) { + Ok(trailer) => trailer, + Err(err) => { + warn!(?err); + // Reuse `payload_buf`. + payload_buf_opt = Some(payload_buf); + self.sender.try_send(Err(err)).ok(); + continue; + } + }; + let trailer = match u3v_stream::Trailer::parse(&trailer_buf) .map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into())) { @@ -274,7 +296,7 @@ impl StreamingLoop { let builder_result = PayloadBuilder { leader, payload_buf, - read_payload_size: payload_len, + read_payload_size: read_payload_size, trailer, } .build(); @@ -525,23 +547,24 @@ impl StreamParams { } } -fn read_leader( - async_pool: &mut AsyncPool, +fn read_leader<'a>( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, - buf: &mut [u8], -) -> StreamResult<()> { + buf: &'a mut [u8], +) -> StreamResult> { let leader_size = params.leader_size; - async_pool.submit(&mut buf[..leader_size])?; + recv(inner, params, buf, leader_size)?; - Ok(()) + u3v_stream::Leader::parse(buf).map_err(|e| StreamError::InvalidPayload(format!("{}", e).into())) } fn read_payload( - async_pool: &mut AsyncPool, + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, buf: &mut [u8], -) -> StreamResult<()> { +) -> StreamResult { let payload_size = params.payload_size; + let mut async_pool = AsyncPool::new(inner); let mut cursor = 0; for _ in 0..params.payload_count { async_pool.submit(&mut buf[cursor..cursor + payload_size])?; @@ -552,20 +575,45 @@ fn read_payload( async_pool.submit(&mut buf[cursor..cursor + params.payload_final1_size])?; cursor += params.payload_final1_size; } - if params.payload_final2_size != 0 { - async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?; + //if params.payload_final2_size != 0 { + // async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?; + //} + + let mut read_len = 0; + while !async_pool.is_empty() { + read_len += async_pool.poll(params.timeout)?; } - Ok(()) + Ok(read_len) } -fn read_trailer( - async_pool: &mut AsyncPool, +fn read_trailer<'a>( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, params: &StreamParams, - buf: &mut [u8], -) -> StreamResult<()> { + buf: &'a mut [u8], +) -> StreamResult> { let trailer_size = params.trailer_size; - async_pool.submit(&mut buf[..trailer_size])?; + recv(inner, params, buf, trailer_size)?; + + u3v_stream::Trailer::parse(buf) + .map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into())) +} + +fn recv( + inner: &mut MutexGuard<'_, u3v::ReceiveChannel>, + params: &StreamParams, + buf: &mut [u8], + len: usize, +) -> StreamResult { + if len == 0 { + return Ok(0); + } + + if buf.len() < len { + return Err(StreamError::BufferTooSmall); + } - Ok(()) + inner + .recv(&mut buf[..len], params.timeout) + .map_err(|e| e.into()) }