Skip to content

Commit

Permalink
This reverts the following patch, in order to fix cameleon-rs#196
Browse files Browse the repository at this point in the history
    commit a371076
    Author: Brian Schwind <brianmschwind@gmail.com>
    Date:   Thu May 11 22:49:45 2023 +0900

        Use async transfers for the leader and trailer buffers

Performing leader, payload and trailer bulk transfers in one common async pool loop
may change the order of buffers. So instead of leader-payload-trailer we may get
trailer-leader-payload, which is currently not handled by the buffer parsing code.
This patch could be re-applied later, acompanied by a proper order-independent handling.
  • Loading branch information
dmikushin committed Nov 3, 2024
1 parent 9f601a2 commit 2ec1521
Showing 1 changed file with 129 additions and 81 deletions.
210 changes: 129 additions & 81 deletions cameleon/src/u3v/stream_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::{
convert::TryInto,
sync::mpsc,
sync::{mpsc::TryRecvError, Arc, Mutex},
sync::{mpsc::TryRecvError, Arc, Mutex, MutexGuard},
time::Duration,
};

Expand Down Expand Up @@ -42,13 +42,47 @@ macro_rules! unwrap_or_poisoned {
}

impl StreamHandle {
pub(super) fn new(device: &u3v::Device) -> ControlResult<Option<Self>> {
let inner = device.stream_channel()?;
Ok(inner.map(|inner| Self {
inner: Arc::new(Mutex::new(inner)),
params: StreamParams::default(),
cancellation_tx: None,
}))
/// Read leader of a stream packet.
///
/// Buffer size must be equal or larger than [`StreamParams::leader_size`].
pub fn read_leader<'a>(&self, buf: &'a mut [u8]) -> StreamResult<u3v_stream::Leader<'a>> {
if self.is_loop_running() {
Err(StreamError::InStreaming)
} else {
read_leader(
&mut unwrap_or_poisoned!(self.inner.lock())?,
&self.params,
buf,
)
}
}

/// Read payload of a stream packet.
pub fn read_payload(&self, buf: &mut [u8]) -> StreamResult<usize> {
if self.is_loop_running() {
Err(StreamError::InStreaming)
} else {
read_payload(
&mut unwrap_or_poisoned!(self.inner.lock())?,
&self.params,
buf,
)
}
}

/// Read trailer of a stream packet.
///
/// Buffer size must be equal of larger than [`StreamParams::trailer_size`].
pub fn read_trailer<'a>(&self, buf: &'a mut [u8]) -> StreamResult<u3v_stream::Trailer<'a>> {
if self.is_loop_running() {
Err(StreamError::InStreaming)
} else {
read_trailer(
&mut unwrap_or_poisoned!(self.inner.lock())?,
&self.params,
buf,
)
}
}

/// Return params.
Expand All @@ -61,6 +95,15 @@ impl StreamHandle {
pub fn params_mut(&mut self) -> &mut StreamParams {
&mut self.params
}

pub(super) fn new(device: &u3v::Device) -> ControlResult<Option<Self>> {
let inner = device.stream_channel()?;
Ok(inner.map(|inner| Self {
inner: Arc::new(Mutex::new(inner)),
params: StreamParams::default(),
cancellation_tx: None,
}))
}
}

impl PayloadStream for StreamHandle {
Expand Down Expand Up @@ -162,9 +205,9 @@ impl StreamingLoop {
let mut trailer_buf = vec![0; self.params.trailer_size];
let mut payload_buf_opt = None;
let mut leader_buf = vec![0; self.params.leader_size];
let inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock().unwrap();

'outer: loop {
loop {
// Stop the loop when
// 1. `cancellation_tx` sends signal.
// 2. `cancellation_tx` is dropped.
Expand All @@ -189,62 +232,19 @@ impl StreamingLoop {
},
};

let mut async_pool = AsyncPool::new(&inner);

if let Err(err) = read_leader(&mut async_pool, &self.params, &mut leader_buf) {
// Report and send error if the error is fatal.
if matches!(err, StreamError::Io(..) | StreamError::Disconnected) {
error!(?err);
self.sender.try_send(Err(err)).ok();
}
payload_buf_opt = Some(payload_buf);
continue;
};

if let Err(err) = read_payload(&mut async_pool, &self.params, &mut payload_buf) {
warn!(?err);
// Reuse `payload_buf`.
payload_buf_opt = Some(payload_buf);
self.sender.try_send(Err(err)).ok();
continue;
};

if let Err(err) = read_trailer(&mut async_pool, &self.params, &mut trailer_buf) {
warn!(?err);
// Reuse `payload_buf`.
payload_buf_opt = Some(payload_buf);
self.sender.try_send(Err(err)).ok();
continue;
};

// We've submitted the bulk transfers, now wait for them.
let mut first_buf_len = None;
let mut last_buf_len = None;
let mut payload_len = 0;

while !async_pool.is_empty() {
let len = match async_pool.poll(self.params.timeout) {
Ok(len) => len,
Err(err) => {
warn!(?err);
// Can't reuse `payload_buf` because we're in a loop.
self.sender.try_send(Err(err.into())).ok();
continue 'outer;
let leader = match read_leader(&mut inner, &self.params, &mut leader_buf) {
Ok(leader) => leader,
Err(err) => {
// Report and send error if the error is fatal.
if matches!(err, StreamError::Io(..) | StreamError::Disconnected) {
error!(?err);
self.sender.try_send(Err(err)).ok();
}
};

if first_buf_len.is_none() {
first_buf_len = Some(len);
} else {
payload_len += len;
payload_buf_opt = Some(payload_buf);
continue;
}
};

last_buf_len = Some(len);
}

let payload_len = payload_len - last_buf_len.unwrap();

// We received the data from the bulk transfers, try to parse stuff now.
let leader = match u3v_stream::Leader::parse(&leader_buf)
.map_err(|e| StreamError::InvalidPayload(format!("{}", e).into()))
{
Expand All @@ -258,6 +258,28 @@ impl StreamingLoop {
}
};

let read_payload_size = match read_payload(&mut inner, &self.params, &mut payload_buf) {
Ok(size) => size,
Err(err) => {
warn!(?err);
// Reuse `payload_buf`.
payload_buf_opt = Some(payload_buf);
self.sender.try_send(Err(err)).ok();
continue;
}
};

let trailer = match read_trailer(&mut inner, &self.params, &mut trailer_buf) {
Ok(trailer) => trailer,
Err(err) => {
warn!(?err);
// Reuse `payload_buf`.
payload_buf_opt = Some(payload_buf);
self.sender.try_send(Err(err)).ok();
continue;
}
};

let trailer = match u3v_stream::Trailer::parse(&trailer_buf)
.map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into()))
{
Expand All @@ -274,7 +296,7 @@ impl StreamingLoop {
let builder_result = PayloadBuilder {
leader,
payload_buf,
read_payload_size: payload_len,
read_payload_size: read_payload_size,
trailer,
}
.build();
Expand Down Expand Up @@ -525,23 +547,24 @@ impl StreamParams {
}
}

fn read_leader(
async_pool: &mut AsyncPool,
fn read_leader<'a>(
inner: &mut MutexGuard<'_, u3v::ReceiveChannel>,
params: &StreamParams,
buf: &mut [u8],
) -> StreamResult<()> {
buf: &'a mut [u8],
) -> StreamResult<u3v_stream::Leader<'a>> {
let leader_size = params.leader_size;
async_pool.submit(&mut buf[..leader_size])?;
recv(inner, params, buf, leader_size)?;

Ok(())
u3v_stream::Leader::parse(buf).map_err(|e| StreamError::InvalidPayload(format!("{}", e).into()))
}

fn read_payload(
async_pool: &mut AsyncPool,
inner: &mut MutexGuard<'_, u3v::ReceiveChannel>,
params: &StreamParams,
buf: &mut [u8],
) -> StreamResult<()> {
) -> StreamResult<usize> {
let payload_size = params.payload_size;
let mut async_pool = AsyncPool::new(inner);
let mut cursor = 0;
for _ in 0..params.payload_count {
async_pool.submit(&mut buf[cursor..cursor + payload_size])?;
Expand All @@ -552,20 +575,45 @@ fn read_payload(
async_pool.submit(&mut buf[cursor..cursor + params.payload_final1_size])?;
cursor += params.payload_final1_size;
}
if params.payload_final2_size != 0 {
async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?;
//if params.payload_final2_size != 0 {
// async_pool.submit(&mut buf[cursor..cursor + params.payload_final2_size])?;
//}

let mut read_len = 0;
while !async_pool.is_empty() {
read_len += async_pool.poll(params.timeout)?;
}

Ok(())
Ok(read_len)
}

fn read_trailer(
async_pool: &mut AsyncPool,
fn read_trailer<'a>(
inner: &mut MutexGuard<'_, u3v::ReceiveChannel>,
params: &StreamParams,
buf: &mut [u8],
) -> StreamResult<()> {
buf: &'a mut [u8],
) -> StreamResult<u3v_stream::Trailer<'a>> {
let trailer_size = params.trailer_size;
async_pool.submit(&mut buf[..trailer_size])?;
recv(inner, params, buf, trailer_size)?;

u3v_stream::Trailer::parse(buf)
.map_err(|e| StreamError::InvalidPayload(format!("invalid trailer: {}", e).into()))
}

fn recv(
inner: &mut MutexGuard<'_, u3v::ReceiveChannel>,
params: &StreamParams,
buf: &mut [u8],
len: usize,
) -> StreamResult<usize> {
if len == 0 {
return Ok(0);
}

if buf.len() < len {
return Err(StreamError::BufferTooSmall);
}

Ok(())
inner
.recv(&mut buf[..len], params.timeout)
.map_err(|e| e.into())
}

0 comments on commit 2ec1521

Please sign in to comment.