Implement AsyncRead and AsyncWrite for bulk_in and bulk_out #97
-
Hi, I'm trying to use nusb on pc to talk to my android phone in accessory mode. Right now I've successfully implemented the open device, send AOA control, and find the read and write endpoints. The data sent from android side will be split by frames, protobuf message more specifically. So I would like to use However when I wrap Framed on interface or bulk_in_queue, I'm getting the message that I need to implement |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
One way USB differs from sockets is that you need to explicitly start transfers before you get data, and the data is batched in transfers instead of arbitrary length read and write calls. A struct wrapping Similarly for AsyncWrite, it would buffer a transfer's worth of data and submit it on flush or when the buffer fills. |
Beta Was this translation helpful? Give feedback.
-
Thanks! I'm new to usb but that makes sense. Here's my code right now. Not sure if what went wrong but I'm getting decode error from the LengthDelimitedCodec. The input from android side is a realtime audio stream so I'm sure it will always fill the buffer. use nusb::transfer::{Queue, RequestBuffer};
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub struct UsbStream {
read_queue: Queue<RequestBuffer>,
write_queue: Queue<Vec<u8>>,
}
const MAX_PACKET_SIZE: usize = 512;
impl UsbStream {
pub fn new(read_queue: Queue<RequestBuffer>, write_queue: Queue<Vec<u8>>) -> Self {
UsbStream {
read_queue,
write_queue,
}
}
}
impl AsyncRead for UsbStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let pin = self.get_mut();
// make sure there's pending request
if pin.read_queue.pending() == 0 {
pin.read_queue.submit(RequestBuffer::new(MAX_PACKET_SIZE));
}
// try to read from the buffer
let mut res = ready!(pin.read_queue.poll_next(cx));
// copy into the buffer
match res.status {
Ok(_) => {
buf.put_slice(res.data.as_mut_slice());
// submit new request
pin.read_queue
.submit(RequestBuffer::reuse(res.data, MAX_PACKET_SIZE));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
}
impl AsyncWrite for UsbStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let pin = self.get_mut();
// send data by chunks
for chunk in buf.chunks(MAX_PACKET_SIZE) {
// create a vector of MAX_PACKET_SIZE size and fill the rest with 0
let mut data = vec![0; MAX_PACKET_SIZE];
data.extend_from_slice(chunk);
pin.write_queue.submit(data);
}
let res = ready!(pin.write_queue.poll_next(cx));
match res.status {
Ok(_) => Poll::Ready(Ok(res.data.actual_length())),
Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let pin = self.get_mut();
pin.write_queue.cancel_all();
pin.read_queue.cancel_all();
Poll::Ready(Ok(()))
}
} |
Beta Was this translation helpful? Give feedback.
One way USB differs from sockets is that you need to explicitly start transfers before you get data, and the data is batched in transfers instead of arbitrary length read and write calls.
A struct wrapping
Queue
and a buffer of received-but-unread bytes could handle this and implement AsyncRead, and nusb should probably include this as a convenience API.poll_read
would copy bytes from the buffer if available, or callpoll_next
on the Queue to get the next buffer, and resubmit the transfer.Similarly for AsyncWrite, it would buffer a transfer's worth of data and submit it on flush or when the buffer fills.