Skip to content

Commit

Permalink
proxy: test various compression scenarios
Browse files Browse the repository at this point in the history
A unit test is written that tests compression mechanisms in the proxy.

Outline of the test:
1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns
   its uncompressed body, and "node" receives the uncompressed frame.
2. "node" responds with an uncompressed RESULT frame, feedback returns
   its uncompressed body, and "driver" receives the uncompressed frame.
3. "driver" sends an uncompressed STARTUP frame, feedback returns its
   uncompressed body, and "node" receives the uncompressed frame.
4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its
   uncompressed body, and "node" receives the compressed frame.
5. "node" responds with a compressed RESULT frame, feedback returns its
   uncompressed body, and "driver" receives the compressed frame.
  • Loading branch information
wprzytula committed Feb 13, 2025
1 parent 816e693 commit f8ebaac
Showing 1 changed file with 199 additions and 0 deletions.
199 changes: 199 additions & 0 deletions scylla-proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,12 @@ mod compression {
pub(crate) fn no_compression() -> CompressionReader {
mock_compression_reader(None)
}

// Compression explicitly turned on.
#[cfg(test)] // Currently only used for tests.
pub(crate) fn with_compression(compression: Compression) -> CompressionReader {
mock_compression_reader(Some(compression))
}
}
pub(crate) use compression::{CompressionReader, CompressionWriter};

Expand Down Expand Up @@ -1472,14 +1478,18 @@ mod tests {
use super::*;
use crate::errors::ReadFrameError;
use crate::frame::{read_frame, read_request_frame, FrameType};
use crate::proxy::compression::with_compression;
use crate::{
setup_tracing, Condition, Reaction as _, RequestReaction, ResponseOpcode, ResponseReaction,
};
use assert_matches::assert_matches;
use bytes::{BufMut, BytesMut};
use futures::future::{join, join3};
use rand::RngCore;
use scylla_cql::frame::request::options::COMPRESSION;
use scylla_cql::frame::request::{SerializableRequest as _, Startup};
use scylla_cql::frame::types::write_string_multimap;
use scylla_cql::frame::{Compression, FLAG_COMPRESSION};
use std::collections::HashMap;
use std::mem;
use std::str::FromStr;
Expand Down Expand Up @@ -2689,4 +2699,193 @@ mod tests {
let _ = request_feedback_rx.try_recv().unwrap_err();
let _ = response_feedback_rx.try_recv().unwrap_err();
}

#[tokio::test]
#[ntest::timeout(1000)]
async fn proxy_compresses_and_decompresses_frames_iff_compression_negociated() {
setup_tracing();
let node1_real_addr = next_local_address_with_port(9876);
let node1_proxy_addr = next_local_address_with_port(9876);

let (request_feedback_tx, mut request_feedback_rx) = mpsc::unbounded_channel();
let (response_feedback_tx, mut response_feedback_rx) = mpsc::unbounded_channel();
let proxy = Proxy::builder()
.with_node(
Node::builder()
.real_address(node1_real_addr)
.proxy_address(node1_proxy_addr)
.shard_awareness(ShardAwareness::Unaware)
.request_rules(vec![RequestRule(
Condition::True,
RequestReaction::noop().with_feedback_when_performed(request_feedback_tx),
)])
.response_rules(vec![ResponseRule(
Condition::True,
ResponseReaction::noop().with_feedback_when_performed(response_feedback_tx),
)])
.build(),
)
.build();
let running_proxy = proxy.run().await.unwrap();

let mock_node_listener = TcpListener::bind(node1_real_addr).await.unwrap();

const PARAMS_REQUEST_NO_COMPRESSION: FrameParams = FrameParams {
flags: 0,
version: 0x04,
stream: 0,
};
const PARAMS_REQUEST_COMPRESSION: FrameParams = FrameParams {
flags: FLAG_COMPRESSION,
..PARAMS_REQUEST_NO_COMPRESSION
};
const PARAMS_RESPONSE_NO_COMPRESSION: FrameParams =
PARAMS_REQUEST_NO_COMPRESSION.for_response();
const PARAMS_RESPONSE_COMPRESSION: FrameParams =
PARAMS_REQUEST_NO_COMPRESSION.for_response();

let make_driver_conn = async { TcpStream::connect(node1_proxy_addr).await.unwrap() };
let make_node_conn = async { mock_node_listener.accept().await.unwrap() };

let (mut driver_conn, (mut node_conn, _)) = join(make_driver_conn, make_node_conn).await;

/* Outline of the test:
* 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
* and "node" receives the uncompressed frame.
* 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
* and "driver" receives the uncompressed frame.
* 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
* and "node" receives the uncompressed frame.
* 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
* and "node" receives the compressed frame.
* 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
* and "driver" receives the compressed frame.
*/

// 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
// and "node" receives the uncompressed frame.
{
let sent_frame = RequestFrame {
params: PARAMS_REQUEST_NO_COMPRESSION,
opcode: RequestOpcode::Query,
body: random_body(),
};

sent_frame
.write(&mut driver_conn, &no_compression())
.await
.unwrap();

let (captured_frame, _) = request_feedback_rx.recv().await.unwrap();
assert_eq!(captured_frame, sent_frame);

let received_frame = read_request_frame(&mut node_conn, &no_compression())
.await
.unwrap();
assert_eq!(received_frame, sent_frame);
}

// 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
// and "driver" receives the uncompressed frame.
{
let sent_frame = ResponseFrame {
params: PARAMS_RESPONSE_NO_COMPRESSION,
opcode: ResponseOpcode::Result,
body: random_body(),
};

sent_frame
.write(&mut node_conn, &no_compression())
.await
.unwrap();

let (captured_frame, _) = response_feedback_rx.recv().await.unwrap();
assert_eq!(captured_frame, sent_frame);

let received_frame = read_response_frame(&mut driver_conn, &no_compression())
.await
.unwrap();
assert_eq!(received_frame, sent_frame);
}

// 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
// and "node" receives the uncompressed frame.
{
let startup_body = Startup {
options: std::iter::once((COMPRESSION.into(), Compression::Lz4.as_str().into()))
.collect(),
}
.to_bytes()
.unwrap();

let sent_frame = RequestFrame {
params: PARAMS_REQUEST_NO_COMPRESSION,
opcode: RequestOpcode::Startup,
body: startup_body,
};

sent_frame
.write(&mut driver_conn, &no_compression())
.await
.unwrap();

let (captured_frame, _) = request_feedback_rx.recv().await.unwrap();
assert_eq!(captured_frame, sent_frame);

let received_frame = read_request_frame(&mut node_conn, &no_compression())
.await
.unwrap();
assert_eq!(received_frame, sent_frame);
}

// 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
// and "node" receives the compressed frame.
{
let sent_frame = RequestFrame {
params: PARAMS_REQUEST_COMPRESSION,
opcode: RequestOpcode::Query,
body: random_body(),
};

sent_frame
.write(&mut driver_conn, &with_compression(Compression::Lz4))
.await
.unwrap();

let (captured_frame, _) = request_feedback_rx.recv().await.unwrap();
assert_eq!(captured_frame, sent_frame);

let received_frame =
read_request_frame(&mut node_conn, &with_compression(Compression::Lz4))
.await
.unwrap();
assert_eq!(received_frame, sent_frame);
}

// 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
// and "driver" receives the compressed frame.
{
let sent_frame = ResponseFrame {
params: PARAMS_RESPONSE_COMPRESSION,
opcode: ResponseOpcode::Result,
body: random_body(),
};

sent_frame
.write(&mut node_conn, &with_compression(Compression::Lz4))
.await
.unwrap();

let (captured_frame, _) = response_feedback_rx.recv().await.unwrap();
assert_eq!(captured_frame, sent_frame);

let received_frame =
read_response_frame(&mut driver_conn, &with_compression(Compression::Lz4))
.await
.unwrap();
assert_eq!(received_frame, sent_frame);
}

running_proxy.finish().await.unwrap();
}
}

0 comments on commit f8ebaac

Please sign in to comment.