From 459ce85b75cb0e3f2207b80c1346d924bbb09f93 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Fri, 10 Apr 2020 22:34:15 +0900 Subject: [PATCH 1/2] Add simulated Read/Write fragmentation test --- tests/common/mod.rs | 49 +++++++++++++++++-- .../response-chunked-echo-throttled.txt | 17 +++++++ tests/server.rs | 27 ++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 tests/fixtures/response-chunked-echo-throttled.txt diff --git a/tests/common/mod.rs b/tests/common/mod.rs index cc2543d..fbd00e6 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -4,7 +4,7 @@ use async_std::path::PathBuf; use async_std::sync::Arc; use async_std::task::{Context, Poll}; use std::pin::Pin; -use std::sync::Mutex; +use std::sync::{atomic::AtomicBool, Mutex}; #[derive(Debug, Copy, Clone)] #[allow(dead_code)] @@ -19,6 +19,12 @@ pub struct TestCase { source_fixture: Arc, expected_fixture: Arc>, result: Arc>, + throttle: Arc, +} + +enum Throttle { + NoThrottle, + YieldPending(AtomicBool, AtomicBool), } impl TestCase { @@ -68,9 +74,15 @@ impl TestCase { source_fixture: Arc::new(source_fixture), expected_fixture: Arc::new(Mutex::new(expected_fixture)), result, + throttle: Arc::new(Throttle::NoThrottle), } } + #[allow(dead_code)] + pub fn throttle(&mut self) { + self.throttle = Arc::new(Throttle::YieldPending(AtomicBool::new(false), AtomicBool::new(false))); + } + pub async fn read_result(&self) -> String { use async_std::prelude::*; let mut result = String::new(); @@ -128,13 +140,44 @@ impl Read for TestCase { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut &*self.source_fixture).poll_read(cx, buf) + match &*self.throttle { + Throttle::NoThrottle => { + Pin::new(&mut &*self.source_fixture).poll_read(cx, buf) + }, + Throttle::YieldPending(read_flag, _) => { + if read_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // read partial + let throttle_len = std::cmp::min(buf.len(), 10); + let buf = &mut buf[..throttle_len]; + let ret = Pin::new(&mut &*self.source_fixture).poll_read(cx, buf); + ret + } + }, + } } } impl Write for TestCase { fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf) + match &*self.throttle { + Throttle::NoThrottle => { + Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf) + }, + Throttle::YieldPending(_, write_flag) => { + if write_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // write partial + let throttle_len = std::cmp::min(buf.len(), 10); + let buf = &buf[..throttle_len]; + Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf) + } + }, + } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/tests/fixtures/response-chunked-echo-throttled.txt b/tests/fixtures/response-chunked-echo-throttled.txt new file mode 100644 index 0000000..a1b3c98 --- /dev/null +++ b/tests/fixtures/response-chunked-echo-throttled.txt @@ -0,0 +1,17 @@ +HTTP/1.1 200 OK +transfer-encoding: chunked +date: {DATE} +content-type: text/plain + +1 +M +6 +ozilla +9 +Developer +5 +Netwo +2 +rk +0 + diff --git a/tests/server.rs b/tests/server.rs index 4fb20f8..15e8a92 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -77,6 +77,33 @@ async fn test_chunked_echo() { case.assert().await; } +#[async_std::test] +async fn test_chunked_echo_throttled() { + let mut case = TestCase::new_server( + "fixtures/request-chunked-echo.txt", + "fixtures/response-chunked-echo-throttled.txt", + ) + .await; + case.throttle(); + let addr = "http://example.com"; + + async_h1::accept(addr, case.clone(), |req| async { + let mut resp = Response::new(StatusCode::Ok); + let ct = req.content_type(); + let body: Body = req.into(); + resp.set_body(body); + if let Some(ct) = ct { + resp.set_content_type(ct); + } + + Ok(resp) + }) + .await + .unwrap(); + + case.assert().await; +} + #[async_std::test] async fn test_unexpected_eof() { // We can't predict unexpected EOF, so the response content-length is still 11 From 9484eb480dd52d3c3e03e70af8cfcc948228db15 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Fri, 10 Apr 2020 22:48:26 +0900 Subject: [PATCH 2/2] Cargo fmt --- tests/common/mod.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index fbd00e6..271f7e7 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -80,7 +80,10 @@ impl TestCase { #[allow(dead_code)] pub fn throttle(&mut self) { - self.throttle = Arc::new(Throttle::YieldPending(AtomicBool::new(false), AtomicBool::new(false))); + self.throttle = Arc::new(Throttle::YieldPending( + AtomicBool::new(false), + AtomicBool::new(false), + )); } pub async fn read_result(&self) -> String { @@ -141,9 +144,7 @@ impl Read for TestCase { buf: &mut [u8], ) -> Poll> { match &*self.throttle { - Throttle::NoThrottle => { - Pin::new(&mut &*self.source_fixture).poll_read(cx, buf) - }, + Throttle::NoThrottle => Pin::new(&mut &*self.source_fixture).poll_read(cx, buf), Throttle::YieldPending(read_flag, _) => { if read_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) { cx.waker().wake_by_ref(); @@ -155,7 +156,7 @@ impl Read for TestCase { let ret = Pin::new(&mut &*self.source_fixture).poll_read(cx, buf); ret } - }, + } } } } @@ -165,7 +166,7 @@ impl Write for TestCase { match &*self.throttle { Throttle::NoThrottle => { Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf) - }, + } Throttle::YieldPending(_, write_flag) => { if write_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) { cx.waker().wake_by_ref(); @@ -176,7 +177,7 @@ impl Write for TestCase { let buf = &buf[..throttle_len]; Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf) } - }, + } } }