Skip to content

Commit

Permalink
feat: implement IMAP COMPRESS
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Sep 22, 2024
1 parent 1954ce4 commit 352be2e
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }

[features]
default = ["runtime-async-std"]
compress = ["async-compression"]

runtime-async-std = ["async-std"]
runtime-tokio = ["tokio"]
runtime-async-std = ["async-std", "async-compression?/futures-io"]
runtime-tokio = ["tokio", "async-compression?/tokio"]

[dependencies]
async-channel = "2.0.0"
async-compression = { version = "0.4.12", default-features = false, features = ["deflate"], optional = true }
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
base64 = "0.21"
bytes = "1"
Expand All @@ -35,6 +37,7 @@ imap-proto = "0.16.4"
log = "0.4.8"
nom = "7.0"
once_cell = "1.8.0"
pin-project = "1"
pin-utils = "0.1.0-alpha.4"
self_cell = "1.0.1"
stop-token = "0.7"
Expand Down
45 changes: 45 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::authenticator::Authenticator;
use super::error::{Error, ParseError, Result, ValidateError};
use super::parse::*;
use super::types::*;
use crate::deflate::DeflateStream;
use crate::extensions::{self, quota::parse_get_quota};
use crate::imap_stream::ImapStream;

Expand Down Expand Up @@ -1324,6 +1325,50 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
Ok(server_identification)
}

/// Runs `COMPRESS DEFLATE` command.
///
/// Returns `Result` which is `Ok` if the stream is now compressed
/// and `Err` if compression was not enabled.
pub async fn compress(
self,
) -> Result<std::result::Result<Session<DeflateStream<T>>, Session<T>>> {
let Self {
mut conn,
unsolicited_responses_tx,
unsolicited_responses,
} = self;
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
.await?;
let stream = conn.into_inner();

if true {
let deflate_stream = DeflateStream::new(stream);
let stream = ImapStream::new(deflate_stream);
let conn = Connection {
stream,
request_ids: IdGenerator::new(),
};
let session = Session {
conn,
unsolicited_responses_tx,
unsolicited_responses,
};
Ok(Ok(session))
} else {
let stream = ImapStream::new(stream);
let conn = Connection {
stream,
request_ids: IdGenerator::new(),
};
let session = Self {
conn,
unsolicited_responses_tx,
unsolicited_responses,
};
Ok(Err(session))
}
}

// these are only here because they are public interface, the rest is in `Connection`
/// Runs a command and checks if it returns OK.
pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
Expand Down
75 changes: 75 additions & 0 deletions src/deflate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use pin_project::pin_project;

#[cfg(feature = "runtime-async-std")]
use async_std::io::{Read, Write};
#[cfg(feature = "runtime-tokio")]
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, ReadBuf};

#[cfg(feature = "runtime-tokio")]
use async_compression::tokio::bufread::DeflateDecoder;
#[cfg(feature = "runtime-tokio")]
use async_compression::tokio::write::DeflateEncoder;

/// IMAP stream
#[derive(Debug)]
#[pin_project]
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
#[cfg(feature = "runtime-tokio")]
#[pin]
decoder: DeflateDecoder<tokio::io::BufReader<tokio::io::ReadHalf<T>>>,

#[cfg(feature = "runtime-tokio")]
#[pin]
encoder: DeflateEncoder<tokio::io::WriteHalf<T>>,
}

#[cfg(feature = "runtime-tokio")]
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
pub(crate) fn new(stream: T) -> Self {
let (read_half, write_half) = tokio::io::split(stream);
let read_half = tokio::io::BufReader::new(read_half);
let decoder = DeflateDecoder::new(read_half);
let encoder = DeflateEncoder::new(write_half);
Self { decoder, encoder }
}
}

#[cfg(feature = "runtime-tokio")]
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.project().decoder.poll_read(cx, buf)
}
}

#[cfg(feature = "runtime-tokio")]
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.project().encoder.poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.project().encoder.poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.project().encoder.poll_shutdown(cx)
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ mod imap_stream;
mod parse;
pub mod types;

#[cfg(feature = "compress")]
pub mod deflate;

pub use crate::authenticator::Authenticator;
pub use crate::client::*;

Expand Down

0 comments on commit 352be2e

Please sign in to comment.