Skip to content

Commit

Permalink
Rename read and write index for tail and head
Browse files Browse the repository at this point in the history
  • Loading branch information
Lonami committed Jun 29, 2024
1 parent b466db5 commit df28423
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions lib/grammers-mtsender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ pub struct Sender<T: Transport, M: Mtp> {

// Transport-level buffers and positions
read_buffer: Vec<u8>,
read_index: usize,
read_tail: usize,
write_buffer: DequeBuffer<u8>,
write_index: usize,
write_head: usize,
}

struct Request {
Expand Down Expand Up @@ -210,9 +210,9 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
reconnection_policy,

read_buffer: vec![0; MAXIMUM_DATA],
read_index: 0,
read_tail: 0,
write_buffer: DequeBuffer::with_capacity(MAXIMUM_DATA, LEADING_BUFFER_SPACE),
write_index: 0,
write_head: 0,
},
Enqueuer(tx),
))
Expand Down Expand Up @@ -243,9 +243,9 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
reconnection_policy,

read_buffer: vec![0; MAXIMUM_DATA],
read_index: 0,
read_tail: 0,
write_buffer: DequeBuffer::with_capacity(MAXIMUM_DATA, LEADING_BUFFER_SPACE),
write_index: 0,
write_head: 0,
},
Enqueuer(tx),
))
Expand Down Expand Up @@ -310,7 +310,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
}

self.try_fill_write();
let write_len = self.write_buffer.len() - self.write_index;
let write_len = self.write_buffer.len() - self.write_head;
trace!(
"reading bytes and sending up to {} bytes via network",
write_len
Expand All @@ -321,12 +321,12 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
let sleep = pin!(async { sleep_until(self.next_ping).await });
let recv_req = pin!(async { self.request_rx.recv().await });
let recv_data =
pin!(async { reader.read(&mut self.read_buffer[self.read_index..]).await });
pin!(async { reader.read(&mut self.read_buffer[self.read_tail..]).await });
let send_data = pin!(async {
if self.write_buffer.is_empty() {
pending().await
} else {
writer.write(&self.write_buffer[self.write_index..]).await
writer.write(&self.write_buffer[self.write_head..]).await
}
});

Expand Down Expand Up @@ -465,18 +465,18 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
)));
}

self.read_index += n;
self.read_tail += n;
trace!("read {} bytes from the network", n);
trace!("trying to unpack buffer of {} bytes...", self.read_index);
trace!("trying to unpack buffer of {} bytes...", self.read_tail);

// TODO the buffer might have multiple transport packets, what should happen with the
// updates successfully read if subsequent packets fail to be deserialized properly?
let mut updates = Vec::new();
let mut next_offset = 0;
while next_offset != self.read_index {
while next_offset != self.read_tail {
match self
.transport
.unpack(&self.read_buffer[next_offset..][..self.read_index])
.unpack(&self.read_buffer[next_offset..][..self.read_tail])
{
Ok(offset) => {
debug!("deserializing valid transport packet...");
Expand All @@ -492,29 +492,28 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
}
}

self.read_buffer
.copy_within(next_offset..self.read_index, 0);
self.read_index -= next_offset;
self.read_buffer.copy_within(next_offset..self.read_tail, 0);
self.read_tail -= next_offset;

Ok(updates)
}

/// Handle `n` more written bytes being ready to process by the transport.
fn on_net_write(&mut self, n: usize) {
self.write_index += n;
self.write_head += n;
trace!(
"written {} bytes to the network ({}/{})",
n,
self.write_index,
self.write_head,
self.write_buffer.len()
);
assert!(self.write_index <= self.write_buffer.len());
if self.write_index != self.write_buffer.len() {
assert!(self.write_head <= self.write_buffer.len());
if self.write_head != self.write_buffer.len() {
return;
}

self.write_buffer.clear();
self.write_index = 0;
self.write_head = 0;
for req in self.requests.iter_mut() {
match req.state {
RequestState::NotSerialized | RequestState::Sent(_) => {}
Expand Down Expand Up @@ -549,14 +548,14 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
self.mtp.reset();
log::info!(
"resetting sender state from read_buffer {}/{}, write_buffer {}/{}",
self.read_index,
self.read_tail,
self.read_buffer.len(),
self.write_index,
self.write_head,
self.write_buffer.len(),
);
self.read_index = 0;
self.read_tail = 0;
self.read_buffer.fill(0);
self.write_index = 0;
self.write_head = 0;
self.write_buffer.clear();

let error = match error {
Expand Down Expand Up @@ -895,9 +894,9 @@ pub async fn generate_auth_key<T: Transport>(
request_rx: sender.request_rx,
next_ping: Instant::now() + PING_DELAY,
read_buffer: sender.read_buffer,
read_index: sender.read_index,
read_tail: sender.read_tail,
write_buffer: sender.write_buffer,
write_index: sender.write_index,
write_head: sender.write_head,
addr: sender.addr,
#[cfg(feature = "proxy")]
proxy_url: sender.proxy_url,
Expand Down

0 comments on commit df28423

Please sign in to comment.