Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read buffer are cleared before frame fully decoded in SerialFramed #77

Open
LHelge opened this issue Jan 15, 2025 · 2 comments
Open

Read buffer are cleared before frame fully decoded in SerialFramed #77

LHelge opened this issue Jan 15, 2025 · 2 comments

Comments

@LHelge
Copy link

LHelge commented Jan 15, 2025

I'm trying to implement a codec for COBS to use together with a SerialFramed for sending and receiving postcard serialized objects to a microcontroller.

My issue is receiving frames that are not complete on each call to decode. The bytes arrive at a low enough pace for each decode call to be decode_eof even though the full frame has not been received. The decode (and also decode_eof) function return Ok(None) unless a full frame has been received and in that case the read buffer is cleared code on:

pin.rd.clear();

Looking at the documentation for the Decoder trait from tokio_util it states:

It is guaranteed that, from one call to decode to another, the provided buffer will contain the exact same data as before, except that if more data has arrived through the IO resource, that data will have been appended to the buffer.

Is this intended behaviour? My code works as expected if I comment the line referenced above.

A very simplifide pseudocode version of my decoder looks like this:

pub struct CobsCodec;

impl Decoder for CobsCodec {
    type Error = CobsError;
    type Item = Vec<u8>;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        match decode_frame(&mut src) {
            Success(frame, len) => {
                src.advance(len);
                Ok(Some(frame))
            },
            Incomplete() => Ok(None),
            Corrupt() => Err(CobsError::Corrupted)
        }
    }

    fn decode_eof(
        &mut self,
        buf: &mut BytesMut,
    ) -> Result<Option<Self::Item>, Self::Error> {
        // Needs to be overridden since default implementation errors on frames crossing 
        // eof boundaries unsuitable for reading from resumable I/O
        self.decode(buf)
    }
}
@LHelge
Copy link
Author

LHelge commented Jan 16, 2025

Simplest code example I could think of to reproduce the problem:

#[cfg(test)]
mod tests {
    use futures::StreamExt;
    use std::time::Duration;
    use tokio::{io::AsyncWriteExt, join, time::sleep};
    use tokio_serial::{frame::SerialFramed, SerialStream};
    use tokio_util::codec::LinesCodec;

    // PASS
    #[tokio::test]
    async fn cobs_frames_async() {
        let (mut master, slave) = SerialStream::pair().unwrap();
        let mut slave_framed = SerialFramed::new(slave, LinesCodec::new());

        let sender = async {
            sleep(Duration::from_millis(100)).await;

            master.write_all(b"Hello, ").await.unwrap();
            master.write_all(b"world!\n").await.unwrap();
        };

        let receiver = async { slave_framed.next().await.unwrap().unwrap() };

        let (_, data) = join!(sender, receiver);

        assert_eq!(data, "Hello, world!");
    }

    // FAIL
    #[tokio::test]
    async fn cobs_frames_async_eof() {
        let (mut master, slave) = SerialStream::pair().unwrap();
        let mut slave_framed = SerialFramed::new(slave, LinesCodec::new());

        let sender = async {
            sleep(Duration::from_millis(100)).await;

            master.write_all(b"Hello, ").await.unwrap();
            sleep(Duration::from_millis(100)).await;
            master.write_all(b"world!\n").await.unwrap();
        };

        let receiver = async { slave_framed.next().await.unwrap().unwrap() };

        let (_, data) = join!(sender, receiver);

        assert_eq!(data, "Hello, world!");
    }
}

@LHelge
Copy link
Author

LHelge commented Jan 16, 2025

And I'm not sure if it makes sense to use decode_eof in a serial port library, since you almost always can expect to resume reading at a later stage: It does not make sense that something like this fails:

    #[tokio::test]
    async fn cobs_frames_length_delimited_async_eof() {
        let (mut master, slave) = SerialStream::pair().unwrap();
        let mut slave_framed = SerialFramed::new(slave, tokio_util::codec::LengthDelimitedCodec::default());

        let data = [0x11, 0x22, 0x33, 0x44];

        let sender = async {
            sleep(Duration::from_millis(100)).await;

            master.write_u32(data.len() as u32).await.unwrap();
            master.write_all(&data[..1]).await.unwrap();
            sleep(Duration::from_millis(100)).await;
            master.write_all(&data[1..]).await.unwrap();
        };

        let receiver = async { slave_framed.next().await.unwrap().unwrap() };

        let (_, received) = join!(sender, receiver);

        assert_eq!(received, BytesMut::from(&data[..]));
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant