diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7e7d8ab..a1a657c 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -6,7 +6,8 @@ "vscode": { "extensions": [ "llvm-vs-code-extensions.vscode-clangd", - "ms-vscode.cpptools" + "ms-vscode.cpptools", + "rust-lang.rust-analyzer" ] } }, diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..5c3003f --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,55 @@ +name: open-rdma-driver CI test +on: + pull_request: + branches: [refactor] + push: + branches: [refactor] + +jobs: + ci-check: + name: rust-driver test + runs-on: self-hosted + container: + image: ghcr.io/myrfy001/hard_devcontainer:main + options: --privileged + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Checkout tools repo + uses: actions/checkout@v4 + with: + repository: datenlord/blue-rdma + ref: next + fetch-depth: 0 + submodules: 'true' + path: blue-rdma + - uses: actions/cache@v4 + id: cache-building-emulator + with: + path: | + ~/.bash_profile + blue-rdma/bsc-2023.01-ubuntu-22.04 + blue-rdma/test + ~/.cache/pip + key: ${{ runner.os }}-blue-rdma-next + - name: build emulator + if: steps.cache-building-emulator.outputs.cache-hit != 'true' + run : | + apt update && apt install -y python3-pip + pip3 install scapy + cd ./blue-rdma + ./setup.sh + ./run_one.sh || true + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - name: run cargo build + working-directory: rust-driver + run : cargo build --verbose + - name: pre run emulator + run : | + cd blue-rdma + ./run_system_test.sh + - name: run cargo test + working-directory: rust-driver + run : cargo test --verbose \ No newline at end of file diff --git a/rust-driver/.gitignore b/rust-driver/.gitignore new file mode 100644 index 0000000..43577d2 --- /dev/null +++ b/rust-driver/.gitignore @@ -0,0 +1,4 @@ +/target +/Cargo.lock + +.DS_Store \ No newline at end of file diff --git a/rust-driver/Cargo.toml b/rust-driver/Cargo.toml new file mode 100644 index 0000000..8c0963a --- /dev/null +++ b/rust-driver/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "open-rdma-driver" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/datenlord/open-rdma-driver" +description = "A user space RDMA driver for BlueRDMA and its software protocol stack" +readme = "README.md" +license = "GPL 2.0" +keywords = ["rdma", "driver"] +categories = ["Command line utilities", "Hardware support"] + +[dependencies] +thiserror = "1.0.56" +num_enum = "0.7.2" +socket2 = { version = "0.5.6", features = ["all"] } +crc32fast = "1.4.0" +bitflags = "2.4.2" +libc = "0.2" +rand = { version = "0.8.5", features = ["std", "std_rng"], default-features = false } +serde_json = "1.0.114" +serde = { version = "1.0.197", features = ["derive"] } +bitfield = "0.14.0" +eui48 = "1.1.0" +log = {version = "0.4", features = ["std"]} +serial_test = "3.0.0" +derive_builder = "0.20.0" +smoltcp = {version = "0.11.0", features = ["verbose"]} +parking_lot = "0.12.2" +flume = "0.11.0" +atomic_enum = "0.3.0" +core_affinity = "0.8.1" + + +[dev-dependencies] +shared_memory = "0.12.4" +ctor = "0.2.7" +buddy_system_allocator = "0.9.1" +libc = "0.2.153" +lazy_static = "1.4.0" +serial_test = "3.0.0" \ No newline at end of file diff --git a/rust-driver/rust-toolchain.toml b/rust-driver/rust-toolchain.toml new file mode 100644 index 0000000..fdf68e4 --- /dev/null +++ b/rust-driver/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.75" +components = ["clippy", "rustfmt"] \ No newline at end of file diff --git a/rust-driver/src/device/constant.rs b/rust-driver/src/device/constant.rs new file mode 100644 index 0000000..cba30da --- /dev/null +++ b/rust-driver/src/device/constant.rs @@ -0,0 +1,47 @@ +enum CsrIndex { + BaseAddrLow = 0x0, + BaseAddrHigh = 0x1, + Head = 0x2, + Tail = 0x3, +} + +const fn generate_csr_addr(is_h2c: bool, queue_index: usize, reg_index: CsrIndex) -> usize { + let mut a = if is_h2c { 1 } else { 0 }; + a <<= 3_i32; + a |= queue_index & 0b111; + a <<= 10_i32; + a |= reg_index as usize & 0x3FF; + a <<= 2_i32; + a +} + +pub(super) const CSR_ADDR_CMD_REQ_QUEUE_ADDR_LOW: usize = + generate_csr_addr(true, 0, CsrIndex::BaseAddrLow); +pub(super) const CSR_ADDR_CMD_REQ_QUEUE_ADDR_HIGH: usize = + generate_csr_addr(true, 0, CsrIndex::BaseAddrHigh); +pub(super) const CSR_ADDR_CMD_REQ_QUEUE_HEAD: usize = generate_csr_addr(true, 0, CsrIndex::Head); +pub(super) const CSR_ADDR_CMD_REQ_QUEUE_TAIL: usize = generate_csr_addr(true, 0, CsrIndex::Tail); +pub(super) const CSR_ADDR_CMD_RESP_QUEUE_HEAD: usize = generate_csr_addr(false, 0, CsrIndex::Head); +pub(super) const CSR_ADDR_CMD_RESP_QUEUE_TAIL: usize = generate_csr_addr(false, 0, CsrIndex::Tail); +pub(super) const CSR_ADDR_CMD_RESP_QUEUE_ADDR_LOW: usize = + generate_csr_addr(false, 0, CsrIndex::BaseAddrLow); +pub(super) const CSR_ADDR_CMD_RESP_QUEUE_ADDR_HIGH: usize = + generate_csr_addr(false, 0, CsrIndex::BaseAddrHigh); +pub(super) const CSR_ADDR_SEND_QUEUE_HEAD: usize = generate_csr_addr(true, 1, CsrIndex::Head); +pub(super) const CSR_ADDR_SEND_QUEUE_TAIL: usize = generate_csr_addr(true, 1, CsrIndex::Tail); +pub(super) const CSR_ADDR_SEND_QUEUE_ADDR_LOW: usize = + generate_csr_addr(true, 1, CsrIndex::BaseAddrLow); +pub(super) const CSR_ADDR_SEND_QUEUE_ADDR_HIGH: usize = + generate_csr_addr(true, 1, CsrIndex::BaseAddrHigh); +pub(super) const CSR_ADDR_META_REPORT_QUEUE_HEAD: usize = + generate_csr_addr(false, 1, CsrIndex::Head); +pub(super) const CSR_ADDR_META_REPORT_QUEUE_TAIL: usize = + generate_csr_addr(false, 1, CsrIndex::Tail); +pub(super) const CSR_ADDR_META_REPORT_QUEUE_ADDR_LOW: usize = + generate_csr_addr(false, 1, CsrIndex::BaseAddrLow); +pub(super) const CSR_ADDR_META_REPORT_QUEUE_ADDR_HIGH: usize = + generate_csr_addr(false, 1, CsrIndex::BaseAddrHigh); + +pub(super) const RINGBUF_DEPTH: usize = 128; +pub(super) const RINGBUF_ELEM_SIZE: usize = 32; +pub(super) const RINGBUF_PAGE_SIZE: usize = 4096; diff --git a/rust-driver/src/device/descriptor/ctrl.rs b/rust-driver/src/device/descriptor/ctrl.rs new file mode 100644 index 0000000..e3dfcd4 --- /dev/null +++ b/rust-driver/src/device/descriptor/ctrl.rs @@ -0,0 +1,396 @@ +use crate::{ + device::{error::DeviceResult, DeviceError}, + types::{Key, MemAccessTypeFlag, Pmtu, Psn, QpType, Qpn}, utils::u8_slice_to_u64, +}; +use eui48::MacAddress; +use num_enum::TryFromPrimitive; +use std::net::Ipv4Addr; + +use super::layout::{ + CmdQueueDescCommonHead, CmdQueueReqDescQpManagementSeg0, CmdQueueReqDescSetNetworkParam, + CmdQueueReqDescSetRawPacketReceiveMeta, CmdQueueReqDescUpdateErrRecoverPoint, + CmdQueueReqDescUpdateMrTable, CmdQueueReqDescUpdatePGT, +}; + +#[derive(Debug)] +/// Host to card cmdq descriptor +pub(crate) enum ToCardCtrlRbDesc { + /// Update memory region table + UpdateMrTable(ToCardCtrlRbDescUpdateMrTable), + + /// Update page table + UpdatePageTable(ToCardCtrlRbDescUpdatePageTable), + + /// QP management + QpManagement(ToCardCtrlRbDescQpManagement), + + /// Set network param + SetNetworkParam(ToCardCtrlRbDescSetNetworkParam), + + /// Set raw packet receive meta + SetRawPacketReceiveMeta(ToCardCtrlRbDescSetRawPacketReceiveMeta), + + /// Update error psn recover point + UpdateErrorPsnRecoverPoint(ToCardCtrlRbDescUpdateErrPsnRecoverPoint), +} + +impl ToCardCtrlRbDesc { + pub(crate) fn set_id(&mut self, id: u32) { + match self { + ToCardCtrlRbDesc::UpdateMrTable(desc) => desc.common.op_id = id, + ToCardCtrlRbDesc::UpdatePageTable(desc) => desc.common.op_id = id, + ToCardCtrlRbDesc::QpManagement(desc) => desc.common.op_id = id, + ToCardCtrlRbDesc::SetNetworkParam(desc) => desc.common.op_id = id, + ToCardCtrlRbDesc::SetRawPacketReceiveMeta(desc) => desc.common.op_id = id, + ToCardCtrlRbDesc::UpdateErrorPsnRecoverPoint(desc) => desc.common.op_id = id, + } + } +} + +/// cmdq response descriptor +#[derive(Debug)] +pub(crate) struct ToHostCtrlRbDesc { + pub(crate) common: ToHostCtrlRbDescCommon, +} + +/// cmdq response descriptor common header +#[derive(Debug)] +pub(crate) struct ToHostCtrlRbDescCommon { + /// The operation id, + pub(crate) op_id: u32, + + /// The opcode of the descriptor + pub(crate) opcode: CtrlRbDescOpcode, + + /// The result of the operation + pub(crate) is_success: bool, +} + +/// common header for host to card cmdq descriptor +#[derive(Debug, Default)] +pub(crate) struct ToCardCtrlRbDescCommon { + pub(crate) op_id: u32, // user_data +} + +/// cmdq update memory region table descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescUpdateMrTable { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// The base virtual address of the memory region + pub(crate) addr: u64, + + /// The length of the memory region + pub(crate) len: u32, + + /// The lkey of the memory region + pub(crate) key: Key, + + /// The pd handler of the memory region + pub(crate) pd_hdl: u32, + + /// The access flags of the memory region + pub(crate) acc_flags: MemAccessTypeFlag, + + /// The offset of in the page table + pub(crate) pgt_offset: u32, +} + +/// cmdq update page table descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescUpdatePageTable { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// The start address of the page table + pub(crate) start_addr: u64, + + /// The index of the page table + pub(crate) pgt_idx: u32, //offset + + /// The count of page table entries + pub(crate) pgte_cnt: u32, //bytes +} + +/// cmdq qp management descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescQpManagement { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// is this qp valid + pub(crate) is_valid: bool, + + /// The QP number + pub(crate) qpn: Qpn, + + /// The PD handle + pub(crate) pd_hdl: u32, + + /// The type of the QP + pub(crate) qp_type: QpType, + + /// The access flags of the receive queue + pub(crate) rq_acc_flags: MemAccessTypeFlag, + + /// The pmtu of the QP + pub(crate) pmtu: Pmtu, + + /// The peer QP number + pub(crate) peer_qpn: Qpn, +} + +/// cmdq set network param descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescSetNetworkParam { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// The gateway of the network + pub(crate) gateway: Ipv4Addr, + + /// The netmask of the network + pub(crate) netmask: Ipv4Addr, + + /// The ip address of the network + pub(crate) ipaddr: Ipv4Addr, + + /// The mac address of the network + pub(crate) macaddr: MacAddress, +} + +/// cmdq set raw packet receive meta descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescSetRawPacketReceiveMeta { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// The base write address of the raw packet receive meta + pub(crate) base_write_addr: u64, + + /// The key of the memory region + pub(crate) key: Key, +} + +/// cmdq update error psn recover point descriptor +#[derive(Debug)] +pub(crate) struct ToCardCtrlRbDescUpdateErrPsnRecoverPoint { + /// common header + pub(crate) common: ToCardCtrlRbDescCommon, + + /// The QP number + pub(crate) qpn: Qpn, + + /// The PSN to recover this qp + pub(crate) recover_psn: Psn, +} + + + +#[derive(Debug, TryFromPrimitive)] +#[repr(u8)] +pub(crate) enum CtrlRbDescOpcode { + UpdateMrTable = 0x00, + UpdatePageTable = 0x01, + QpManagement = 0x02, + SetNetworkParam = 0x03, + SetRawPacketReceiveMeta = 0x04, + UpdateErrorPsnRecoverPoint = 0x05, +} + +impl ToCardCtrlRbDesc { + pub(super) fn write(&self, dst: &mut [u8]) { + fn write_common_header(dst: &mut [u8], opcode: CtrlRbDescOpcode, op_id: u32) { + // typedef struct { + // Bit#(32) userData; + // ReservedZero#(20) reserved1; + // Bit#(4) extraSegmentCnt; + // Bit#(6) opCode; + // Bool isSuccessOrNeedSignalCplt; + // Bool valid; + // } CmdQueueDescCommonHead deriving(Bits, FShow); + + let mut common = CmdQueueDescCommonHead(dst); + common.set_valid(true); + common.set_reserverd(0); + common.set_is_success_or_need_signal_cplt(false); + common.set_op_code(opcode as u32); + common.set_extra_segment_cnt(0); + common.set_user_data(op_id); + } + + fn write_update_mr_table(dst: &mut [u8], desc: &ToCardCtrlRbDescUpdateMrTable) { + // typedef struct { + // ReservedZero#(7) reserved1; + // Bit#(17) pgtOffset; + // Bit#(8) accFlags; + // Bit#(32) pdHandler; + // Bit#(32) mrKey; + // Bit#(32) mrLength; + // Bit#(64) mrBaseVA; + // CmdQueueDescCommonHead commonHeader; + // } CmdQueueReqDescUpdateMrTable deriving(Bits, FShow); + + // bytes 0-7 are header bytes, ignore them + + let mut update_mr_table = CmdQueueReqDescUpdateMrTable(dst); + update_mr_table.set_mr_base_va(desc.addr); + update_mr_table.set_mr_length(desc.len.into()); + update_mr_table.set_mr_key(desc.key.get().into()); + update_mr_table.set_pd_handler(desc.pd_hdl.into()); + update_mr_table.set_acc_flags(desc.acc_flags.bits().into()); + update_mr_table.set_pgt_offset(desc.pgt_offset.into()); + } + + fn write_update_page_table(dst: &mut [u8], desc: &ToCardCtrlRbDescUpdatePageTable) { + // typedef struct { + // ReservedZero#(64) reserved1; + // Bit#(32) dmaReadLength; + // Bit#(32) startIndex; + // Bit#(64) dmaAddr; + // CmdQueueDescCommonHead commonHeader; + // } CmdQueueReqDescUpdatePGT deriving(Bits, FShow); + + // bits 0-7 are header bits + let mut update_pgt = CmdQueueReqDescUpdatePGT(dst); + update_pgt.set_dma_addr(desc.start_addr); + update_pgt.set_start_index(desc.pgt_idx.into()); + #[allow(clippy::arithmetic_side_effects)] + // this will less than MR_PGT_SIZE * 8, which will not overflow + update_pgt.set_dma_read_length((desc.pgte_cnt * 8).into()); + } + + fn write_qp_management(dst: &mut [u8], desc: &ToCardCtrlRbDescQpManagement) { + // typedef struct { + // ReservedZero#(80) reserved1; // 80 bits + // QPN qpn; // 24 bits + // ReservedZero#(5) reserved2; // 5 bits + // PMTU pmtu; // 3 bits + // FlagsType#(MemAccessTypeFlag) rqAccessFlags; // 8 bits + // ReservedZero#(4) reserved3; // 4 bits + // TypeQP qpType; // 4 bits + // HandlerPD pdHandler; // 32 bits + // QPN qpn; // 24 bits + // ReservedZero#(6) reserved4; // 6 bits + // Bool isError; // 1 bit + // Bool isValid; // 1 bit + // CmdQueueDescCommonHead commonHeader; // 64 bits + // } CmdQueueReqDescQpManagementSeg0 deriving(Bits, FShow); + + let mut seg0 = CmdQueueReqDescQpManagementSeg0(dst); + seg0.set_is_valid(desc.is_valid); + seg0.set_is_error(false); + seg0.set_qpn(desc.qpn.get().into()); + seg0.set_pd_handler(desc.pd_hdl.into()); + seg0.set_qp_type(desc.qp_type as u64); + seg0.set_rq_access_flags(desc.rq_acc_flags.bits().into()); + seg0.set_pmtu(desc.pmtu as u64); + seg0.set_peer_qpn(desc.peer_qpn.get().into()); + } + + fn write_set_network_param(dst: &mut [u8], desc: &ToCardCtrlRbDescSetNetworkParam) { + let mut network_params = CmdQueueReqDescSetNetworkParam(dst); + network_params.set_eth_mac_addr(u8_slice_to_u64(desc.macaddr.as_bytes())); + network_params.set_ip_addr(u8_slice_to_u64(&desc.ipaddr.octets())); + network_params.set_ip_gateway(u8_slice_to_u64(&desc.gateway.octets())); + network_params.set_ip_netmask(u8_slice_to_u64(&desc.netmask.octets())); + } + + fn write_set_raw_packet_receive_meta( + dst: &mut [u8], + desc: &ToCardCtrlRbDescSetRawPacketReceiveMeta, + ) { + let mut raw_packet_recv_meta = CmdQueueReqDescSetRawPacketReceiveMeta(dst); + raw_packet_recv_meta.set_write_base_addr(desc.base_write_addr); + raw_packet_recv_meta.set_write_mr_key(u64::from(desc.key.get())); + } + + fn write_update_err_psn_recover_point( + dst: &mut [u8], + desc: &ToCardCtrlRbDescUpdateErrPsnRecoverPoint, + ) { + let mut raw_packet_recv_meta = CmdQueueReqDescUpdateErrRecoverPoint(dst); + raw_packet_recv_meta.set_qpn(desc.qpn.get()); + raw_packet_recv_meta.set_psn(desc.recover_psn.get()); + } + + match self { + ToCardCtrlRbDesc::UpdateMrTable(desc) => { + write_common_header(dst, CtrlRbDescOpcode::UpdateMrTable, desc.common.op_id); + write_update_mr_table(dst, desc); + } + ToCardCtrlRbDesc::UpdatePageTable(desc) => { + write_common_header(dst, CtrlRbDescOpcode::UpdatePageTable, desc.common.op_id); + write_update_page_table(dst, desc); + } + ToCardCtrlRbDesc::QpManagement(desc) => { + write_common_header(dst, CtrlRbDescOpcode::QpManagement, desc.common.op_id); + write_qp_management(dst, desc); + } + ToCardCtrlRbDesc::SetNetworkParam(desc) => { + write_common_header(dst, CtrlRbDescOpcode::SetNetworkParam, desc.common.op_id); + write_set_network_param(dst, desc); + } + ToCardCtrlRbDesc::SetRawPacketReceiveMeta(desc) => { + write_common_header( + dst, + CtrlRbDescOpcode::SetRawPacketReceiveMeta, + desc.common.op_id, + ); + write_set_raw_packet_receive_meta(dst, desc); + } + ToCardCtrlRbDesc::UpdateErrorPsnRecoverPoint(desc) => { + write_common_header( + dst, + CtrlRbDescOpcode::UpdateErrorPsnRecoverPoint, + desc.common.op_id, + ); + write_update_err_psn_recover_point(dst, desc); + } + } + } +} + +impl ToHostCtrlRbDesc { + pub(super) fn read(src: &[u8]) -> DeviceResult { + // typedef struct { + // Bit#(32) userData; + // ReservedZero#(20) reserved1; + // Bit#(4) extraSegmentCnt; + // Bit#(6) opCode; + // Bool isSuccessOrNeedSignalCplt; + // Bool valid; + // } CmdQueueDescCommonHead deriving(Bits, FShow); + let head = CmdQueueDescCommonHead(src); + + let valid = head.get_valid(); + assert!(valid, "Invalid CmdQueueDescCommonHead"); + + let extra_segment_cnt = head.get_extra_segment_cnt(); + assert!( + extra_segment_cnt == 0, + "extra_segment_cnt: {extra_segment_cnt}" + ); + + let is_success = head.get_is_success_or_need_signal_cplt(); + // bitfield restricts the field is not longer than 8bits. + // So we can safely cast it to u8. + #[allow(clippy::cast_possible_truncation)] + let opcode_raw = head.get_op_code() as u8; + + let opcode = CtrlRbDescOpcode::try_from(opcode_raw).map_err(|_| { + DeviceError::ParseDesc(format!("CtrlRbDescOpcode = {opcode_raw} can not be parsed")) + })?; + let op_id = head.get_user_data().to_le(); + + let common = ToHostCtrlRbDescCommon { + op_id, + opcode, + is_success, + }; + + let desc = ToHostCtrlRbDesc { common }; + Ok(desc) + } +} diff --git a/rust-driver/src/device/descriptor/layout.rs b/rust-driver/src/device/descriptor/layout.rs new file mode 100644 index 0000000..aa38df5 --- /dev/null +++ b/rust-driver/src/device/descriptor/layout.rs @@ -0,0 +1,290 @@ +#![allow(clippy::indexing_slicing)] +// Using the `#!` to suppress the warning of `clippy::indexing_slicing` in the generated code. +use bitfield::bitfield; + +const OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH: usize = 32; + +pub(super) const SIZE_OF_BTH_IN_BYTES: usize = 8; + +pub(super) const OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES: usize = + OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH / 8; + +const OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH: usize = 96; + +pub(super) const OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES: usize = + OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH / 8; + +pub(super) const OFFSET_OF_AETH_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT_IN_BYTES: usize = + OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES; + +pub(super) const OFFSET_OF_IMM_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT: usize = 28; + +pub(super) const SIZE_OF_IMM_IN_BYTES: usize = 4; + +bitfield! { + pub struct CmdQueueDescCommonHead([u8]); + u32; + pub get_valid , set_valid: 0; + pub get_is_success_or_need_signal_cplt, set_is_success_or_need_signal_cplt: 1; + pub get_op_code, set_op_code: 7, 2; + pub get_extra_segment_cnt, set_extra_segment_cnt: 11, 8; + pub _reserverd, set_reserverd: 31, 12; + pub get_user_data, set_user_data: 63, 32; +} + +bitfield! { + pub struct CmdQueueReqDescUpdateMrTable([u8]); + u64; + _cmd_queue_desc_common_head,_: 63, 0; // 64bits + pub get_mr_base_va, set_mr_base_va: 127, 64; // 64bits + pub get_mr_length, set_mr_length: 159, 128; // 32bits + pub get_mr_key, set_mr_key: 191, 160; // 32bits + pub get_pd_handler, set_pd_handler: 223, 192; // 32bits + pub get_acc_flags, set_acc_flags: 231, 224; // 8bits + pub get_pgt_offset, set_pgt_offset: 248, 232; // 17bits + _reserved0, _: 255, 249; // 7bits +} + +bitfield! { + pub struct CmdQueueReqDescUpdatePGT([u8]); + u64; + __cmd_queue_desc_common_head,_ : 63, 0; // 64bits + pub get_dma_addr, set_dma_addr: 127, 64; // 64bits + pub get_start_index, set_start_index: 159, 128; // 32bits + pub get_dma_read_length, set_dma_read_length: 191, 160; // 32bits + _reserved0, _: 255, 192; // 64bits +} + +bitfield! { + pub struct CmdQueueReqDescQpManagementSeg0([u8]); + u64; + _cmd_queue_desc_common_head,_: 63, 0; // 64bits + pub get_is_valid, set_is_valid: 64; // 1bit + pub get_is_error, set_is_error: 65; // 1bit + _reserverd4, _: 71, 66; // 6bits + pub get_qpn, set_qpn: 95, 72; // 24bits + pub get_pd_handler, set_pd_handler: 127, 96; // 32bits + pub get_qp_type, set_qp_type: 131, 128; // 4bits + _reserverd3, _: 135, 132; // 4bits + pub get_rq_access_flags, set_rq_access_flags: 143, 136; // 8bits + pub get_pmtu, set_pmtu: 146, 144; // 3bits + _reserverd2, _: 151, 147; // 5bits + pub get_peer_qpn, set_peer_qpn: 175, 152; // 24bits + _reserverd1, _: 255, 176; // 80bits +} + +bitfield! { + pub struct CmdQueueReqDescSetNetworkParam([u8]); + u64; + _cmd_queue_desc_common_head,_: 63 , 0; // 64bits + pub get_ip_gateway, set_ip_gateway: 95 , 64; // 32bits + pub get_ip_netmask, set_ip_netmask: 127, 96; // 32bit + pub get_ip_addr, set_ip_addr: 159, 128; // 32bit + _reserverd1, _: 191, 160; // 32bit + pub get_eth_mac_addr, set_eth_mac_addr: 239, 192; // 48bit + _reserverd2, _: 255, 240; // 16bit +} + +bitfield! { + pub struct CmdQueueReqDescSetRawPacketReceiveMeta([u8]); + u64; + _cmd_queue_desc_common_head,_: 63 , 0; // 64bits + pub get_write_base_addr, set_write_base_addr: 127, 64; // 64bits + pub get_write_mr_key, set_write_mr_key: 159, 128; // 32bits + _reserverd1, _: 191, 160; // 32bits + _reserverd2, _: 255, 240; // 64bits +} + +// typedef struct { +// ReservedZero#(136) reserved1; // 136 bits +// QPN qpn; // 24 bits +// ReservedZero#(8) reserved2; // 8 bits +// PSN recoverPoint; // 24 bits +// CmdQueueDescCommonHead commonHeader; // 64 bits +// } CmdQueueReqDescUpdateErrorPsnRecoverPoint deriving(Bits, FShow); +bitfield! { + pub struct CmdQueueReqDescUpdateErrRecoverPoint([u8]); + u32; + _cmd_queue_desc_common_head,_: 63 , 0; // 64bits + pub get_psn, set_psn: 87 , 64; // 24bits + _reserverd1, _: 95 , 88; // 8 bits + pub get_qpn, set_qpn: 119, 96; // 24bits + _reserverd2, _: 255, 120; // 64bits +} + +bitfield! { + pub struct SendQueueDescCommonHead([u8]); + u32; + pub get_valid , set_valid: 0; // 1bit + pub get_is_success_or_need_signal_cplt, set_is_success_or_need_signal_cplt: 1; // 1bit + pub get_is_first, set_is_first: 2; // 1bit + pub get_is_last, set_is_last: 3; // 1bit + pub get_op_code, set_op_code: 7, 4; // 4bits + pub get_extra_segment_cnt, set_extra_segment_cnt: 11, 8; // 4bits + _reserverd, _: 31, 12; // 20bits + pub get_total_len, set_total_len: 63, 32; // 32bits +} + +bitfield! { + pub struct SendQueueReqDescSeg0([u8]); + u64; + _common_header, _: 63, 0; // 64bits + pub get_raddr, set_raddr: 127, 64; // 64bits + pub get_rkey, set_rkey: 159, 128; // 32bits + pub get_dqp_ip, set_dqp_ip: 191, 160; // 32bits + pub get_pkey, set_pkey: 207, 192; // 16bits + _reserverd, _: 255, 208; // 48bits +} + +bitfield! { + pub struct SendQueueReqDescSeg1([u8]); + u64; + pub get_pmtu, set_pmtu: 2, 0; // 3bits + _reserved8 , _: 7, 3; // 5bits + pub get_flags, set_flags: 12, 8; // 5bits + _reserved7 , _: 15, 13; // 3bits + pub get_qp_type, set_qp_type: 19, 16; // 4bits + _reserved6 , _: 23, 20; // 4bits + pub get_seg_cnt, set_seg_cnt: 26, 24; // 3bits + _reserved5 , _: 31, 27; // 5bits + pub get_psn, set_psn: 55, 32; // 24bits + _reserved4 , _: 63, 56; // 8bits + pub get_mac_addr, set_mac_addr: 111, 64; // 48bits + _reserved3 , _: 127, 112; // 16bits + pub get_dqpn, set_dqpn: 151, 128; // 24bits + _reserved2 , _: 159, 152; // 8bits + pub get_imm, set_imm: 191, 160; // 32bits + _reserved1 , _: 255, 192; // 64bits +} + +bitfield! { + pub struct SendQueueReqDescFragSGE([u8]); + u64; + pub get_lkey, set_lkey: 31, 0; // 32bits + pub get_len, set_len: 63, 32; // 32bits + pub get_laddr, set_laddr: 127, 64; // 64bits +} + +bitfield! { + pub struct MetaReportQueueDescFragRETH([u8]); + u64; + pub get_va, set_va: 63, 0; // 64bits + pub get_rkey, set_rkey: 95, 64; // 32bits + pub get_dlen, set_dlen: 127, 96; // 32bits +} + +bitfield! { + pub struct MetaReportQueueDescFragImmDT([u8]); + u32; + pub get_imm, set_imm: 32, 0; // 32bits +} + +bitfield! { + pub struct MetaReportQueueDescFragAETH([u8]); + u32; + pub get_psn, set_psn: 23, 0; // 24bits + pub get_msn, set_msn: 47, 24; // 24bits + pub get_aeth_value, set_aeth_value: 52, 48; // 5bits + pub get_aeth_code, set_aeth_code: 55, 53; // 3bits +} + +bitfield! { + pub struct MetaReportQueueDescBthReth([u8]); + impl Debug; + u64; + pub get_expected_psn, _: 23,0; // 24bits + pub get_req_status, _: 31,24; // 8bit + pub get_bth, _: 95, OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH; // 64bits + pub get_reth, _: 223, OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH; // 128bits + pub get_msn, _: 247,224; // 24bits + reserved1,_ : 254, 248; // 7bits + pub get_can_auto_ack, _: 255; // 1bit +} + +bitfield! { + pub struct MetaReportQueueDescFragBTH([u8]); + u32; + pub get_trans_type,set_trans_type: 2, 0; // 3bits + pub get_opcode,set_opcode: 7, 3; // 5bits + pub get_qpn,set_qpn: 31, 8; // 24bits + pub get_psn,set_psn: 55, 32; // 24bits + pub get_solicited,set_solicited: 56; // 1bit + pub get_ack_req,set_ack_req: 57; // 1bit + pub get_pad_cnt,set_pad_cnt: 63, 58; // 4bits +} + +bitfield! { + pub struct MetaReportQueueDescFragSecondaryRETH([u8]); + u64; + pub get_secondary_va,set_secondary_va: 63, 0; // 64bits + pub get_secondary_rkey,set_secondary_rkey: 95, 64; // 32bits +} + +bitfield! { + /// IPv4 layout + pub struct Ipv4([u8]); + u32; + pub get_version_and_len,set_version_and_len: 7, 0; // 8bits + pub get_dscp_ecn,set_dscp_ecn: 15, 8; // 8bits + pub get_total_length,set_total_length: 31, 16; // 16bits + pub get_identification,set_identification: 47, 32; // 16bits + pub get_fragment_offset,set_fragment_offset: 63, 48; // 16bits + pub get_ttl,set_ttl: 71, 64; // 8bits + pub get_protocol,set_protocol: 79, 72; // 8bits + pub get_checksum,set_checksum: 95, 80; // 16bits + pub get_source,set_source: 127, 96; // 32bits + pub get_destination,set_destination: 159, 128; // 32bits +} + +bitfield! { + /// UDP layout + pub struct Udp([u8]); + u16; + pub get_src_port,set_src_port: 15, 0; // 16bits + pub get_dst_port,set_dst_port: 31, 16; // 16bits + pub get_length,set_length: 47, 32; // 16bits + pub get_checksum,set_checksum: 63, 48; // 16bits +} + +bitfield! { + /// BTH layout + pub struct Bth([u8]); + u32; + pub get_opcode,set_opcode: 7, 0; // 8bits + _padding_0,_ : 9, 8; // 2bits + pub get_pad_count,set_pad_count: 11, 10; // 2bits + _padding_1,_ : 15, 12; // 4bits + pub get_pkey,set_pkey: 31, 16; // 16bits + pub _,set_ecn_and_resv6: 39, 32; // 8bits + pub get_dqpn,set_dqpn: 63, 40; // 24bits + _padding_2,_ : 71, 64; // 8bits + pub get_psn,set_psn: 95, 72; // 24bits +} + +bitfield! { + /// Aeth layout + pub struct Aeth([u8]); + u32; + _padding_0,_ : 0; // 1bits + pub get_aeth_code,set_aeth_code: 2, 1; // 2bits + pub get_aeth_value,set_aeth_value: 7, 3; // 5bits + _padding_1,_ : 15,8; // 8bits + pub get_msn,set_msn: 31,16; // 16bits +} + +bitfield! { + /// Nak Retry Eth layout + pub struct NReth([u8]); + u32; + pub get_last_retry_psn,set_last_retry_psn: 23, 0; // 24bits + _padding_0,_: 31, 24; // 8its +} + +bitfield! { + /// Mac layout + pub struct Mac([u8]); + u64; + pub get_dst_mac_addr,set_dst_mac_addr: 47, 0; // 48bits + pub get_src_mac_addr,set_src_mac_addr: 95, 48; // 48bits + pub get_network_layer_type,set_network_layer_type: 111, 96; // 16bits +} diff --git a/rust-driver/src/device/descriptor/mod.rs b/rust-driver/src/device/descriptor/mod.rs new file mode 100644 index 0000000..e07baaf --- /dev/null +++ b/rust-driver/src/device/descriptor/mod.rs @@ -0,0 +1,12 @@ +use std::error::Error; + +use thiserror::Error; + +/// ctrl descriptor +pub(crate) mod ctrl; + +/// work descriptor +pub(crate) mod work; + +/// layout of a descriptor +mod layout; \ No newline at end of file diff --git a/rust-driver/src/device/descriptor/work.rs b/rust-driver/src/device/descriptor/work.rs new file mode 100644 index 0000000..3054112 --- /dev/null +++ b/rust-driver/src/device/descriptor/work.rs @@ -0,0 +1,1010 @@ +use std::{io, net::Ipv4Addr}; + +use super::{ + layout::{ + MetaReportQueueDescBthReth, MetaReportQueueDescFragAETH, MetaReportQueueDescFragBTH, + MetaReportQueueDescFragImmDT, MetaReportQueueDescFragRETH, + MetaReportQueueDescFragSecondaryRETH, SendQueueDescCommonHead, SendQueueReqDescFragSGE, + SendQueueReqDescSeg0, SendQueueReqDescSeg1, + OFFSET_OF_AETH_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT_IN_BYTES, + OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES, + OFFSET_OF_IMM_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT, + OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES, SIZE_OF_BTH_IN_BYTES, + SIZE_OF_IMM_IN_BYTES, + }, +}; +use crate::{ + device::{error::DeviceResult, DeviceError}, + types::{Imm, Key, Msn, Pmtu, Psn, QpType, Qpn, Sge, WorkReqSendFlag}, utils::u8_slice_to_u64, +}; +use eui48::MacAddress; +use num_enum::TryFromPrimitive; + +/// Card to host work ring buffer descriptor +#[derive(Clone, Debug)] +pub(crate) enum ToCardWorkRbDesc { + Read(ToCardWorkRbDescRead), + Write(ToCardWorkRbDescWrite), + WriteWithImm(ToCardWorkRbDescWriteWithImm), + ReadResp(ToCardWorkRbDescWrite), +} + +/// Host to card work ring buffer descriptor +#[derive(Debug)] +pub(crate) enum ToHostWorkRbDesc { + Read(ToHostWorkRbDescRead), + WriteOrReadResp(ToHostWorkRbDescWriteOrReadResp), + WriteWithImm(ToHostWorkRbDescWriteWithImm), + Ack(ToHostWorkRbDescAck), + Raw(ToHostWorkRbDescRaw), +} + +impl ToHostWorkRbDesc { + pub(crate) fn status(&self) -> &ToHostWorkRbDescStatus { + match self { + ToHostWorkRbDesc::Read(desc) => &desc.common.status, + ToHostWorkRbDesc::WriteOrReadResp(desc) => &desc.common.status, + ToHostWorkRbDesc::WriteWithImm(desc) => &desc.common.status, + ToHostWorkRbDesc::Ack(desc) => &desc.common.status, + ToHostWorkRbDesc::Raw(desc) => &desc.common.status, + } + } +} + +/// Common fields for host to card work ring buffer descriptor +#[derive(Clone, Debug)] +pub(crate) struct ToCardWorkRbDescCommon { + /// Total length of the payload + pub(crate) total_len: u32, + + /// Remote virtual address + pub(crate) raddr: u64, + + /// `rkey` for raddr + pub(crate) rkey: Key, + + /// destination qp ip + pub(crate) dqp_ip: Ipv4Addr, + + /// destination qp number + pub(crate) dqpn: Qpn, + + /// destination mac address + pub(crate) mac_addr: MacAddress, + + /// PMTU + pub(crate) pmtu: Pmtu, + + /// Send flags + pub(crate) flags: WorkReqSendFlag, + + /// QP type + pub(crate) qp_type: QpType, + + /// PSN + pub(crate) psn: Psn, + + /// MSN + pub(crate) msn: Msn, +} + +impl Default for ToCardWorkRbDescCommon { + fn default() -> Self { + Self { + total_len: 0, + raddr: 0, + rkey: Key::default(), + dqp_ip: Ipv4Addr::new(0, 0, 0, 0), + dqpn: Qpn::default(), + mac_addr: MacAddress::default(), + pmtu: Pmtu::Mtu256, + flags: WorkReqSendFlag::empty(), + qp_type: QpType::Rc, + psn: Psn::default(), + msn: Msn::default(), + } + } +} + +/// Host to card workq read request descriptor +#[derive(Default, Clone, Debug)] +pub(crate) struct ToCardWorkRbDescRead { + /// Common fields + pub(crate) common: ToCardWorkRbDescCommon, + + /// Scatter gather element + pub(crate) sge: DescSge, +} + +/// Host to card workq write descriptor +#[derive(Default, Clone, Debug)] +pub(crate) struct ToCardWorkRbDescWrite { + /// Common fields + pub(crate) common: ToCardWorkRbDescCommon, + + /// Is last descriptor of this transaction + pub(crate) is_last: bool, + + /// Is the first descriptor of this transaction + pub(crate) is_first: bool, + + /// Scatter gather element + pub(crate) sge0: DescSge, +} + +/// Host to card workq write with immediate descriptor +#[derive(Clone, Debug, Default)] +pub(crate) struct ToCardWorkRbDescWriteWithImm { + /// Common fields + pub(crate) common: ToCardWorkRbDescCommon, + + /// Is last descriptor of this transaction + pub(crate) is_last: bool, + + /// Is the first descriptor of this transaction + pub(crate) is_first: bool, + + /// Immediate data + pub(crate) imm: u32, + + /// Scatter gather element + pub(crate) sge0: DescSge, +} + +/// Host to card workq response common fields +#[derive(Debug, Default, Clone)] +pub(crate) struct ToHostWorkRbDescCommon { + /// status of the descriptor + pub(crate) status: ToHostWorkRbDescStatus, + + /// transport type + pub(crate) trans: ToHostWorkRbDescTransType, + + /// destination qp number + pub(crate) dqpn: Qpn, + + /// MSN + pub(crate) msn: Msn, + + /// expected PSN + pub(crate) expected_psn: Psn, +} + +/// Host to card workq read request descriptor +#[derive(Debug, Default, Clone)] +pub(crate) struct ToHostWorkRbDescRead { + /// Common fields + pub(crate) common: ToHostWorkRbDescCommon, + + /// length of the payload + pub(crate) len: u32, + + /// local address + pub(crate) laddr: u64, + + /// local key + pub(crate) lkey: Key, + + /// remote address + pub(crate) raddr: u64, + + /// remote key + pub(crate) rkey: Key, +} + +/// Host to workq work write or read response descriptor +#[derive(Debug, Clone)] +pub(crate) struct ToHostWorkRbDescWriteOrReadResp { + /// Common fields + pub(crate) common: ToHostWorkRbDescCommon, + + /// Is this a read response? + pub(crate) is_read_resp: bool, + + /// Write type + pub(crate) write_type: ToHostWorkRbDescWriteType, + + /// PSN + pub(crate) psn: Psn, + + /// Write address + pub(crate) addr: u64, + + /// Write length + pub(crate) len: u32, + + /// Can hardware auto ack + pub(crate) can_auto_ack: bool, +} + +impl Default for ToHostWorkRbDescWriteOrReadResp { + fn default() -> Self { + Self { + common: ToHostWorkRbDescCommon::default(), + is_read_resp: false, + write_type: ToHostWorkRbDescWriteType::Only, + psn: Psn::default(), + addr: 0, + len: 0, + can_auto_ack: false, + } + } +} + +/// Host to card work write with immediate descriptor +#[allow(unused)] // Currently we don't have write imm descriptor +#[derive(Debug)] +pub(crate) struct ToHostWorkRbDescWriteWithImm { + /// Common fields + pub(crate) common: ToHostWorkRbDescCommon, + + /// Write type + pub(crate) write_type: ToHostWorkRbDescWriteType, + + /// PSN + pub(crate) psn: Psn, + + /// Immediate data + pub(crate) imm: u32, + + /// Write address + pub(crate) addr: u64, + + /// Write length + pub(crate) len: u32, + + /// Can hardware auto ack + pub(crate) key: Key, +} + +/// Host to card work acknowledge descriptor +#[derive(Debug, Default, Clone)] +pub(crate) struct ToHostWorkRbDescAck { + /// Common fields + pub(crate) common: ToHostWorkRbDescCommon, + + /// MSN + pub(crate) msn: Msn, + + /// PSN + pub(crate) psn: Psn, + + /// AETH code + pub(crate) code: ToHostWorkRbDescAethCode, + #[allow(unused)] // used in nack checking + pub(crate) value: u8, +} + +/// Host to card work raw descriptor +#[derive(Debug, Default)] +pub(crate) struct ToHostWorkRbDescRaw { + /// Common fields + pub(crate) common: ToHostWorkRbDescCommon, + + /// Raw data address + pub(crate) addr: u64, + + /// Raw data length + pub(crate) len: u32, + + /// Raw data lkey + pub(crate) key: Key, +} + +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct DescSge { + addr: u64, + len: u32, + key: Key, +} + +impl From for DescSge { + fn from(sge: Sge) -> Self { + Self { + addr: sge.phy_addr, + len: sge.len, + key: sge.key, + } + } +} + +#[derive(TryFromPrimitive, Debug, Clone)] +#[repr(u8)] +pub(crate) enum ToHostWorkRbDescStatus { + /// Normal status + Normal = 1, + + /// Invalid access flag + InvAccFlag = 2, + + /// Invalid opcode + InvOpcode = 3, + + /// Invalid MR key + InvMrKey = 4, + + /// Invalid MR region + InvMrRegion = 5, + + /// Unknown error + Unknown = 6, +} + +impl Default for ToHostWorkRbDescStatus { + fn default() -> Self { + Self::Normal + } +} + +impl ToHostWorkRbDescStatus { + pub(crate) fn is_ok(&self) -> bool { + matches!(self, ToHostWorkRbDescStatus::Normal) + } +} + +#[derive(TryFromPrimitive, Debug, Clone, Copy)] +#[repr(u8)] +pub(crate) enum ToHostWorkRbDescTransType { + Rc = 0x00, + Uc = 0x01, + Rd = 0x02, + Ud = 0x03, + Cnp = 0x04, + Xrc = 0x05, + DtldExtended = 0x06, // Customize for normal packet. +} + +impl Default for ToHostWorkRbDescTransType { + fn default() -> Self { + Self::Rc + } +} + +#[derive(Debug, Clone)] +pub(crate) enum ToHostWorkRbDescWriteType { + First, + Middle, + Last, + Only, +} + +#[derive(Debug, Clone)] +pub(crate) enum ToCardWorkRbDescOpcode { + // IBV_WR_RDMA_WRITE = 0, + // IBV_WR_RDMA_WRITE_WITH_IMM = 1, + // IBV_WR_SEND = 2, + // IBV_WR_SEND_WITH_IMM = 3, + // IBV_WR_RDMA_READ = 4, + // IBV_WR_ATOMIC_CMP_AND_SWP = 5, + // IBV_WR_ATOMIC_FETCH_AND_ADD = 6, + // IBV_WR_LOCAL_INV = 7, + // IBV_WR_BIND_MW = 8, + // IBV_WR_SEND_WITH_INV = 9, + // IBV_WR_TSO = 10, + // IBV_WR_DRIVER1 = 11, + // IBV_WR_RDMA_READ_RESP = 12, // Not defined in rdma-core + // IBV_WR_FLUSH = 14, + // IBV_WR_ATOMIC_WRITE = 15 + Write = 0, + WriteWithImm = 1, + Read = 4, + ReadResp = 12, // Not defined in rdma-core +} + +/// Incomplete `ToHostWorkRbDesc` +/// +/// Most descriptor take only one slot, but some descriptors like read request +/// need more than one slot to store the whole descriptor. +pub(crate) struct IncompleteToHostWorkRbDesc { + parsed: ToHostWorkRbDesc, +} + +/// Card to host work ring buffer descriptor error +pub(crate) enum ToHostWorkRbDescError { + Incomplete(IncompleteToHostWorkRbDesc), + DeviceError(DeviceError), +} + +/// To host opcode +#[derive(TryFromPrimitive, PartialEq, Eq, Debug, Clone)] +#[repr(u8)] +pub(crate) enum ToHostWorkRbDescOpcode { + // SendFirst = 0x00, + // SendMiddle = 0x01, + // SendLast = 0x02, + // SendLastWithImmediate = 0x03, + // SendOnly = 0x04, + // SendOnlyWithImmediate = 0x05, + // RdmaWriteFirst = 0x06, + // RdmaWriteMiddle = 0x07, + // RdmaWriteLast = 0x08, + // RdmaWriteLastWithImmediate = 0x09, + // RdmaWriteOnly = 0x0a, + // RdmaWriteOnlyWithImmediate = 0x0b, + // RdmaReadRequest = 0x0c, + // Acknowledge = 0x11, + // AtomicAcknowledge = 0x12, + // CompareSwap = 0x13, + // FetchAdd = 0x14, + // Resync = 0x15, + // SendLastWithInvalidate = 0x16, + // SendOnlyWithInvalidate = 0x17, + RdmaWriteFirst = 0x06, + RdmaWriteMiddle = 0x07, + RdmaWriteLast = 0x08, + RdmaWriteLastWithImmediate = 0x09, + RdmaWriteOnly = 0x0a, + RdmaWriteOnlyWithImmediate = 0x0b, + RdmaReadResponseFirst = 0x0d, + RdmaReadResponseMiddle = 0x0e, + RdmaReadResponseLast = 0x0f, + RdmaReadResponseOnly = 0x10, + RdmaReadRequest = 0x0c, + Acknowledge = 0x11, +} + +impl ToHostWorkRbDescOpcode { + pub(crate) fn is_first(&self) -> bool { + match self { + ToHostWorkRbDescOpcode::RdmaWriteFirst + | ToHostWorkRbDescOpcode::RdmaReadResponseFirst => true, + ToHostWorkRbDescOpcode::RdmaWriteMiddle + | ToHostWorkRbDescOpcode::RdmaWriteLast + | ToHostWorkRbDescOpcode::RdmaWriteLastWithImmediate + | ToHostWorkRbDescOpcode::RdmaWriteOnly + | ToHostWorkRbDescOpcode::RdmaWriteOnlyWithImmediate + | ToHostWorkRbDescOpcode::RdmaReadResponseMiddle + | ToHostWorkRbDescOpcode::RdmaReadResponseLast + | ToHostWorkRbDescOpcode::RdmaReadResponseOnly + | ToHostWorkRbDescOpcode::RdmaReadRequest + | ToHostWorkRbDescOpcode::Acknowledge => false, + } + } + + pub(crate) fn is_resp(&self) -> bool { + matches!( + self, + ToHostWorkRbDescOpcode::RdmaReadResponseFirst + | ToHostWorkRbDescOpcode::RdmaReadResponseMiddle + | ToHostWorkRbDescOpcode::RdmaReadResponseLast + | ToHostWorkRbDescOpcode::RdmaReadResponseOnly + ) + } + + pub(crate) fn is_read_resp(&self) -> bool { + matches!( + self, + ToHostWorkRbDescOpcode::RdmaReadResponseFirst + | ToHostWorkRbDescOpcode::RdmaReadResponseMiddle + | ToHostWorkRbDescOpcode::RdmaReadResponseLast + | ToHostWorkRbDescOpcode::RdmaReadResponseOnly + ) + } + + pub(crate) fn write_type(&self) -> Option { + match self { + ToHostWorkRbDescOpcode::RdmaWriteFirst + | ToHostWorkRbDescOpcode::RdmaReadResponseFirst => { + Some(ToHostWorkRbDescWriteType::First) + } + ToHostWorkRbDescOpcode::RdmaWriteMiddle + | ToHostWorkRbDescOpcode::RdmaReadResponseMiddle => { + Some(ToHostWorkRbDescWriteType::Middle) + } + ToHostWorkRbDescOpcode::RdmaWriteLast + | ToHostWorkRbDescOpcode::RdmaWriteLastWithImmediate + | ToHostWorkRbDescOpcode::RdmaReadResponseLast => Some(ToHostWorkRbDescWriteType::Last), + ToHostWorkRbDescOpcode::RdmaWriteOnlyWithImmediate + | ToHostWorkRbDescOpcode::RdmaWriteOnly + | ToHostWorkRbDescOpcode::RdmaReadResponseOnly => Some(ToHostWorkRbDescWriteType::Only), + ToHostWorkRbDescOpcode::RdmaReadRequest | ToHostWorkRbDescOpcode::Acknowledge => None, + } + } +} + +/// Aeth code +#[derive(TryFromPrimitive, Clone, PartialEq, Eq, Debug)] +#[repr(u8)] +pub(crate) enum ToHostWorkRbDescAethCode { + // AETH_CODE_ACK = 2'b00, + // AETH_CODE_RNR = 2'b01, + // AETH_CODE_RSVD = 2'b10, + // AETH_CODE_NAK = 2'b11 + Ack = 0b00, + Rnr = 0b01, + Rsvd = 0b10, + Nak = 0b11, +} + +impl Default for ToHostWorkRbDescAethCode { + fn default() -> Self { + Self::Rsvd + } +} + +impl ToCardWorkRbDesc { + pub(super) fn write_0(&self, dst: &mut [u8]) { + let (common, opcode, is_first, is_last) = match self { + ToCardWorkRbDesc::Read(desc) => { + (&desc.common, ToCardWorkRbDescOpcode::Read, true, true) + } + ToCardWorkRbDesc::Write(desc) => ( + &desc.common, + ToCardWorkRbDescOpcode::Write, + desc.is_first, + desc.is_last, + ), + ToCardWorkRbDesc::WriteWithImm(desc) => ( + &desc.common, + ToCardWorkRbDescOpcode::WriteWithImm, + desc.is_first, + desc.is_last, + ), + ToCardWorkRbDesc::ReadResp(desc) => ( + &desc.common, + ToCardWorkRbDescOpcode::ReadResp, + desc.is_first, + desc.is_last, + ), + }; + + let mut head = SendQueueDescCommonHead(dst); + head.set_valid(true); + head.set_is_success_or_need_signal_cplt(false); + head.set_is_first(is_first); + head.set_is_last(is_last); + head.set_op_code(opcode as u32); + + #[allow(clippy::arithmetic_side_effects)] + // self.serialized_desc_cnt() always greater than 1 + let extra_segment_cnt = self.serialized_desc_cnt() - 1; + head.set_extra_segment_cnt(extra_segment_cnt); + head.set_total_len(common.total_len); + + // typedef struct { + // ReservedZero#(64) reserved1; // 64 bits + // AddrIPv4 dqpIP; // 32 bits + // RKEY rkey; // 32 bits + // ADDR raddr; // 64 bits + // SendQueueDescCommonHead commonHeader; // 64 bits + // } SendQueueReqDescSeg0 deriving(Bits, FShow); + // let mut seg0 = SendQueueReqDescSeg0(&mut dst[8..]); + let mut head = SendQueueReqDescSeg0(&mut head.0); + head.set_raddr(common.raddr); + head.set_rkey(common.rkey.get().into()); + head.set_dqp_ip(u8_slice_to_u64(&common.dqp_ip.octets())); + // We use the pkey field to store the `MSN`. + head.set_pkey(common.msn.get().into()); + } + + pub(super) fn write_1(&self, dst: &mut [u8]) { + // typedef struct { + // ReservedZero#(64) reserved1; // 64 bits + + // IMM imm; // 32 bits + + // ReservedZero#(8) reserved2; // 8 bits + // QPN dqpn; // 24 bits + + // ReservedZero#(16) reserved3; // 16 bits + // MAC macAddr; // 48 bits + + // ReservedZero#(8) reserved4; // 8 bits + // PSN psn; // 24 bits + + // ReservedZero#(5) reserved5; // 5 bits + // NumSGE sgeCnt; // 3 bits + + // ReservedZero#(4) reserved6; // 4 bits + // TypeQP qpType; // 4 bits + + // ReservedZero#(3) reserved7; // 3 bits + // WorkReqSendFlag flags; // 5 bits + + // ReservedZero#(5) reserved8; // 5 bits + // PMTU pmtu; // 3 bits + // } SendQueueReqDescSeg1 deriving(Bits, FShow); + + let (common, sge_cnt) = match self { + ToCardWorkRbDesc::Read(desc) => (&desc.common, 1u64), + ToCardWorkRbDesc::Write(desc) | ToCardWorkRbDesc::ReadResp(desc) => { + (&desc.common, 1u64) + } + ToCardWorkRbDesc::WriteWithImm(desc) => (&desc.common, 1u64), + }; + let mut desc_common = SendQueueReqDescSeg1(dst); + desc_common.set_pmtu(common.pmtu as u64); + desc_common.set_flags(u64::from(common.flags.bits())); + desc_common.set_qp_type(common.qp_type as u64); + desc_common.set_seg_cnt(sge_cnt); + desc_common.set_psn(common.psn.get().into()); + desc_common.set_mac_addr(u8_slice_to_u64(common.mac_addr.as_bytes())); + + desc_common.set_dqpn(common.dqpn.get().into()); + + if let ToCardWorkRbDesc::WriteWithImm(desc) = self { + desc_common.set_imm(u64::from(desc.imm)); + } else { + desc_common.set_imm(0); + } + } + + #[allow(clippy::indexing_slicing)] + pub(super) fn write_2(&self, dst: &mut [u8]) { + // typedef struct { + // ADDR laddr; // 64 bits + // Length len; // 32 bits + // LKEY lkey; // 32 bits + // } SendQueueReqDescFragSGE deriving(Bits, FShow); + + // typedef struct { + // SendQueueReqDescFragSGE sge1; // 128 bits + // SendQueueReqDescFragSGE sge2; // 128 bits + // } SendQueueReqDescVariableLenSGE deriving(Bits, FShow); + + let sge0 = match self { + ToCardWorkRbDesc::Read(desc) => &desc.sge, + ToCardWorkRbDesc::Write(desc) | ToCardWorkRbDesc::ReadResp(desc) => &desc.sge0, + ToCardWorkRbDesc::WriteWithImm(desc) => &desc.sge0, + }; + // Note that the order of the sges is reversed in the struct + let mut frag_sge = SendQueueReqDescFragSGE(&mut dst[16..32]); + frag_sge.set_laddr(sge0.addr); + frag_sge.set_len(sge0.len.into()); + frag_sge.set_lkey(sge0.key.get().into()); + } + + #[allow(clippy::unused_self)] // we might need more than 3 descriptors later + pub(super) fn serialized_desc_cnt(&self) -> u32 { + 3 + } +} + +impl ToHostWorkRbDesc { + /// (addr, key, len) + fn read_reth(src: &[u8]) -> (u64, Key, u32) { + // typedef struct { + // Length dlen; // 32 + // RKEY rkey; // 32 + // ADDR va; // 64 + // } MetaReportQueueDescFragRETH deriving(Bits, FShow); + + // first 12 bytes are desc type, status and bth + #[allow(clippy::indexing_slicing)] + let frag_reth = MetaReportQueueDescFragRETH( + &src[OFFSET_OF_RETH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES..], + ); + let addr = frag_reth.get_va(); + // bitfield restricts the field is not longer than 32 bits. + #[allow(clippy::cast_possible_truncation)] + let key = Key::new_unchecked(frag_reth.get_rkey() as u32); + #[allow(clippy::cast_possible_truncation)] + let len = frag_reth.get_dlen() as u32; + + (addr, key, len) + } + + fn read_imm(src: &[u8]) -> u32 { + // typedef struct { + // IMM data; // 32 + // } MetaReportQueueDescFragImmDT deriving(Bits, FShow); + + // first 28 bytes are desc type, status, bth and reth + #[allow(clippy::indexing_slicing)] + let imm = MetaReportQueueDescFragImmDT( + &src[OFFSET_OF_IMM_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT + ..OFFSET_OF_IMM_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT + SIZE_OF_IMM_IN_BYTES], + ); + // call the `to_be` to convert order + imm.get_imm() + } + + // (last_psn, msn, value, code) + #[allow(clippy::cast_possible_truncation)] + fn read_aeth(src: &[u8]) -> DeviceResult<(Psn, Msn, u8, ToHostWorkRbDescAethCode)> { + // typedef struct { + // AethCode code; // 3 + // AethValue value; // 5 + // MSN msn; // 24 + // PSN lastRetryPSN; // 24 + // } MetaReportQueueDescFragAETH deriving(Bits, FShow); + + // first 12 bytes are desc type, status and bth + #[allow(clippy::indexing_slicing)] + let frag_aeth = MetaReportQueueDescFragAETH( + &src[OFFSET_OF_AETH_IN_META_REPORT_QUEUE_DESC_FRAG_IMM_DT_IN_BYTES..], + ); + let psn = Psn::new(frag_aeth.get_psn()); + let msg_seq_number = Msn::new(frag_aeth.get_msn() as u16); + let value = frag_aeth.get_aeth_value() as u8; + let code = frag_aeth.get_aeth_code() as u8; + let code = ToHostWorkRbDescAethCode::try_from(code).map_err(|_| { + DeviceError::ParseDesc(format!( + "ToHostWorkRbDescAethCode = {code} can not be parsed" + )) + })?; + + Ok((psn, msg_seq_number, value, code)) + } + + #[allow( + clippy::cast_possible_truncation, + clippy::indexing_slicing, + clippy::too_many_lines + )] + pub(super) fn read(src: &[u8]) -> Result { + // typedef struct { + // ReservedZero#(8) reserved1; // 8 + // MSN msn; // 24 + // MetaReportQueueDescFragRETH reth; // 128 + // MetaReportQueueDescFragBTH bth; // 64 + // RdmaReqStatus reqStatus; // 8 + // PSN expectedPSN; // 24 + // } MetaReportQueueDescBthRethReth deriving(Bits, FShow); + let desc_bth = MetaReportQueueDescBthReth(&src); + + let expected_psn = Psn::new(desc_bth.get_expected_psn() as u32); + let mut status = desc_bth.get_req_status() as u8; + if status == 0 { + status = desc_bth.get_req_status() as u8; + } + let status = ToHostWorkRbDescStatus::try_from(status).map_err(|_| { + ToHostWorkRbDescError::DeviceError(DeviceError::ParseDesc(format!( + "ToHostWorkRbDescStatus = {status} {desc_bth:?} can not be parsed" + ))) + })?; + + // typedef struct { + // ReservedZero#(4) reserved1; // 4 + // PAD padCnt; // 2 + // Bool ackReq; // 1 + // Bool solicited; // 1 + // PSN psn; // 24 + // QPN dqpn; // 24 + // RdmaOpCode opcode; // 5 + // TransType trans; // 3 + // } MetaReportQueueDescFragBTH deriving(Bits, FShow); + + let desc_frag_bth = MetaReportQueueDescFragBTH( + &src[OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES + ..OFFSET_OF_BTH_IN_META_REPORT_QUEUE_DESC_BTH_RETH_IN_BYTES + SIZE_OF_BTH_IN_BYTES], + ); + let trans = ToHostWorkRbDescTransType::try_from(desc_frag_bth.get_trans_type() as u8) + .map_err(|_| { + ToHostWorkRbDescError::DeviceError(DeviceError::ParseDesc(format!( + "ToHostWorkRbDescTransType = {} can not be parsed", + desc_frag_bth.get_trans_type() + ))) + })?; + let opcode = + ToHostWorkRbDescOpcode::try_from(desc_frag_bth.get_opcode() as u8).map_err(|_| { + ToHostWorkRbDescError::DeviceError(DeviceError::ParseDesc(format!( + "ToHostWorkRbDescOpcode = {} can not be parsed", + desc_frag_bth.get_opcode() + ))) + })?; + let dqpn = Qpn::new(desc_frag_bth.get_qpn()); + let psn = Psn::new(desc_frag_bth.get_psn()); + let msg_seq_number = Msn::new(desc_bth.get_msn() as u16); + + let common = ToHostWorkRbDescCommon { + status, + trans, + dqpn, + msn: msg_seq_number, + expected_psn, + }; + let is_read_resp = opcode.is_read_resp(); + // The default value will not be used since the `write_type` will only appear + // in those write related opcodes. + let write_type = opcode + .write_type() + .unwrap_or(ToHostWorkRbDescWriteType::Only); + match opcode { + ToHostWorkRbDescOpcode::RdmaWriteFirst + | ToHostWorkRbDescOpcode::RdmaWriteMiddle + | ToHostWorkRbDescOpcode::RdmaWriteLast + | ToHostWorkRbDescOpcode::RdmaWriteOnly + | ToHostWorkRbDescOpcode::RdmaReadResponseFirst + | ToHostWorkRbDescOpcode::RdmaReadResponseMiddle + | ToHostWorkRbDescOpcode::RdmaReadResponseLast + | ToHostWorkRbDescOpcode::RdmaReadResponseOnly => { + let (addr, _, len) = Self::read_reth(src); + let can_auto_ack = desc_bth.get_can_auto_ack(); + Ok(ToHostWorkRbDesc::WriteOrReadResp( + ToHostWorkRbDescWriteOrReadResp { + common, + is_read_resp, + write_type, + psn, + addr, + len, + can_auto_ack, + }, + )) + } + ToHostWorkRbDescOpcode::RdmaWriteLastWithImmediate + | ToHostWorkRbDescOpcode::RdmaWriteOnlyWithImmediate => { + let (addr, key, len) = Self::read_reth(src); + if matches!(common.trans, ToHostWorkRbDescTransType::DtldExtended) { + Err(ToHostWorkRbDescError::Incomplete( + IncompleteToHostWorkRbDesc { + parsed: ToHostWorkRbDesc::Raw(ToHostWorkRbDescRaw { + common, + addr, + len, + key, + }), + }, + )) + } else { + let imm = Self::read_imm(src); + Ok(ToHostWorkRbDesc::WriteWithImm( + ToHostWorkRbDescWriteWithImm { + common, + write_type, + psn, + imm, + addr, + len, + key, + }, + )) + } + } + ToHostWorkRbDescOpcode::RdmaReadRequest => { + let (addr, key, len) = Self::read_reth(src); + + Err(ToHostWorkRbDescError::Incomplete( + IncompleteToHostWorkRbDesc { + parsed: ToHostWorkRbDesc::Read(ToHostWorkRbDescRead { + common, + len, + laddr: addr, + lkey: key, + raddr: 0, + rkey: Key::default(), + }), + }, + )) + } + ToHostWorkRbDescOpcode::Acknowledge => { + let (_last_psn, msn_in_ack, value, code) = + Self::read_aeth(src).map_err(ToHostWorkRbDescError::DeviceError)?; + Ok(ToHostWorkRbDesc::Ack(ToHostWorkRbDescAck { + common, + msn: msn_in_ack, + value, + psn, + code, + })) + } + } + } +} + +impl IncompleteToHostWorkRbDesc { + #[allow(clippy::cast_possible_truncation)] + pub(super) fn read(self, src: &[u8]) -> Result { + fn read_second_reth(src: &[u8]) -> (u64, Key) { + // typedef struct { + // RKEY secondaryRkey; // 32 + // ADDR secondaryVa; // 64 + // } MetaReportQueueDescFragSecondaryRETH deriving(Bits, FShow); + let secondary_reth = MetaReportQueueDescFragSecondaryRETH(&src); + let addr = secondary_reth.get_secondary_va(); + let key = Key::new_unchecked(secondary_reth.get_secondary_rkey() as u32); + + (addr, key) + } + + match self.parsed { + ToHostWorkRbDesc::Read(mut desc) => { + let (raddr, rkey) = read_second_reth(src); + desc.raddr = raddr; + desc.rkey = rkey; + Ok(ToHostWorkRbDesc::Read(desc)) + } + ToHostWorkRbDesc::Raw(desc) => Ok(ToHostWorkRbDesc::Raw(desc)), // ignore the redundant imm + ToHostWorkRbDesc::WriteOrReadResp(_) + | ToHostWorkRbDesc::WriteWithImm(_) + | ToHostWorkRbDesc::Ack(_) => unreachable!(), + } + } +} + +pub(crate) struct ToCardWorkRbDescBuilder { + type_: ToCardWorkRbDescOpcode, + common: Option, + sge: Option, + imm: Option, +} + +impl ToCardWorkRbDescBuilder { + /// Create a new `ToCardWorkRbDescBuilder` + pub(crate) fn new(type_: ToCardWorkRbDescOpcode) -> Self { + Self { + type_, + common: None, + sge: None, + imm: None, + } + } + + /// with `ToCardWorkRbDescCommon` + pub(crate) fn with_common(mut self, common: ToCardWorkRbDescCommon) -> Self { + self.common = Some(common); + self + } + + /// with scatter gather element + pub(crate) fn with_sge(mut self, sge: Sge) -> Self { + self.sge = Some(sge); + self + } + + /// with immediate + pub(crate) fn with_imm(mut self, imm: Imm) -> Self { + self.imm = Some(imm.get()); + self + } + + /// build a to card work ringbuf descriptor + pub(crate) fn build(self) -> Result, io::Error> { + let common = self + .common + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing common"))?; + let desc = match self.type_ { + ToCardWorkRbDescOpcode::Write => { + if let Some(sge) = self.sge { + ToCardWorkRbDesc::Write(ToCardWorkRbDescWrite { + common, + is_last: true, + is_first: true, + sge0: sge.into(), + }) + } else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing sge")); + } + } + ToCardWorkRbDescOpcode::WriteWithImm => { + let Some(sge) = self.sge else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing sge")); + }; + let Some(imm) = self.imm else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing imm")); + }; + ToCardWorkRbDesc::WriteWithImm(ToCardWorkRbDescWriteWithImm { + common, + is_last: true, + is_first: true, + imm, + sge0: sge.into(), + }) + } + ToCardWorkRbDescOpcode::Read => { + if let Some(sge) = self.sge { + ToCardWorkRbDesc::Read(ToCardWorkRbDescRead { + common, + sge: sge.into(), + }) + } else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing sge")); + } + } + ToCardWorkRbDescOpcode::ReadResp => { + if let Some(sge) = self.sge { + ToCardWorkRbDesc::ReadResp(ToCardWorkRbDescWrite { + common, + is_last: true, + is_first: true, + sge0: sge.into(), + }) + } else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing sge")); + } + } + }; + Ok(Box::new(desc)) + } +} diff --git a/rust-driver/src/device/error.rs b/rust-driver/src/device/error.rs new file mode 100644 index 0000000..847edd1 --- /dev/null +++ b/rust-driver/src/device/error.rs @@ -0,0 +1,23 @@ +use std::error::Error; +use thiserror::Error; + +#[derive(Debug, Error)] +pub(crate) enum DeviceError { + /// Device related error + #[error("device error : {0}")] + Device(Box), + + /// Scheduler + #[error("scheduler : {0}")] + Scheduler(String), + + /// Failed to parse the descriptor + #[error("parse descriptor error : {0}")] + ParseDesc(String), + + /// Polling timeout + #[error("Operation timeout")] + Timeout, +} + +pub(crate)type DeviceResult = Result; \ No newline at end of file diff --git a/rust-driver/src/device/mod.rs b/rust-driver/src/device/mod.rs new file mode 100644 index 0000000..7739d06 --- /dev/null +++ b/rust-driver/src/device/mod.rs @@ -0,0 +1,16 @@ + + + +/// The layout of a desciptor in ringbuf +mod descriptor; + +/// Some constants of the device +mod constant; + +/// A ring buffer for interaction between the driver and the hardware device +mod ringbuf; + +/// Error type for device adatpor and related modules +mod error; + +use error::{DeviceError,DeviceResult}; \ No newline at end of file diff --git a/rust-driver/src/device/ringbuf.rs b/rust-driver/src/device/ringbuf.rs new file mode 100644 index 0000000..67cbf1c --- /dev/null +++ b/rust-driver/src/device/ringbuf.rs @@ -0,0 +1,774 @@ +use std::time::Duration; + +use parking_lot::{Mutex, MutexGuard}; + +use super::{error::DeviceResult, DeviceError}; + +/// A trait provides the ability to poll a descriptor. +/// +/// The implementor should return `true` if the descriptor is ready to be processed. +/// Note that the buffer is mutable, so the implementor can update the descriptor in the buffer. +/// For example, the implementor can erase the `ready` field so that it won't be read again. +pub(super) trait PollDescriptor { + fn poll(&self, buf: &mut [u8]) -> DeviceResult; +} + +/// An adaptor to read the tail pointer and write the head pointer. +/// In this case, the host act as an producer to generate descriptors. +pub(super) trait ProducerRbMetaAdaptor { + /// Write the head pointer to the some where can be read by the device. + fn write_head(&self, data: u32) -> DeviceResult<()>; + + /// Read the tail pointer from the device. + fn read_tail(&self) -> DeviceResult; +} + +/// An adaptor to read the head pointer and write the tail pointer. +/// In this case, the host act as an customer to read descriptors. +pub(super) trait CustomerRbMetaAdaptor { + /// Write the tail pointer. + fn write_tail(&self, data: u32) -> DeviceResult<()>; + + /// Read the head pointer. + fn read_head(&self) -> DeviceResult; +} + +/// The Ringbuf is a circular buffer used to communicate between the host and the card. +/// +/// `ADP` is an adaptor to provide meta data of the queue(like head pointer or is it ready) +/// `BUF` is the buffer. +/// `DEPTH` is the max capacity of the ringbuf. +/// `ELEM_SIZE` is the size of each descriptor. +/// `PAGE_SIZE` is the size of the page. In real hardware, the buffer should be aligned to `PAGE_SIZE`. +#[derive(Debug)] +pub(super) struct Ringbuf { + buf: Mutex, + head: usize, + tail: usize, + adaptor: ADP, + + // constant value or mask + depth: usize, + elem_size: usize, + hardware_idx_mask: usize, + hardware_idx_guard_mask: usize, + memory_idx_mask: usize, +} + +/// A writer for host to write descriptors to the ring buffer. +pub(super) struct RingbufWriter<'ringbuf, 'adaptor, ADP: ProducerRbMetaAdaptor, BUF: AsMut<[u8]>> { + buf: MutexGuard<'ringbuf, BUF>, + head: &'ringbuf mut usize, + tail: &'ringbuf mut usize, + written_cnt: usize, + adaptor: &'adaptor ADP, + depth: usize, + elem_size: usize, + hardware_idx_mask: &'ringbuf usize, + hardware_idx_guard_mask: &'ringbuf usize, + memory_idx_mask: &'ringbuf usize, +} + +impl + AsRef<[u8]>> Ringbuf { + #[allow(clippy::arithmetic_side_effects)] // false positive in assert + pub(super) fn new( + adaptor: ADP, + buffer: BUF, + depth: usize, + elem_size: usize, + page_size: usize, + ) -> Self { + assert!(_is_power_of_2(depth), "invalid ringbuf depth"); + assert!(_is_power_of_2(elem_size), "invalid element size"); + assert!(depth * elem_size >= page_size, "invalid ringbuf size"); + assert!(depth < u32::MAX as usize, "depth should be less than u32::MAX"); + #[cfg(not(test))] + { + assert!( + (buffer.as_ref().as_ptr() as usize).wrapping_rem(page_size) == 0, + "buffer should be aligned to page_size" + ); + assert!( + buffer.as_ref().len().wrapping_rem(page_size) == 0, + "buffer size should be multiple of page_size" + ); + } + assert!( + buffer.as_ref().len() >= depth.wrapping_mul(elem_size), + "buffer is too small" + ); + Self { + buf: Mutex::new(buffer), + head: 0, + tail: 0, + adaptor, + depth, + elem_size, + hardware_idx_mask: gen_hardware_idx_mask(depth), + hardware_idx_guard_mask: gen_hardware_idx_guard_mask(depth), + memory_idx_mask: gen_memory_idx_mask(depth), + } + } +} + +impl> Ringbuf { + /// Return a writer to write descriptors to the ring buffer. + pub(super) fn get_writer(&mut self) -> RingbufWriter<'_, '_, ADP, BUF> { + RingbufWriter { + buf: self.buf.lock(), + head: &mut self.head, + tail: &mut self.tail, + written_cnt: 0, + adaptor: &self.adaptor, + depth: self.depth, + elem_size: self.elem_size, + hardware_idx_mask: &self.hardware_idx_mask, + hardware_idx_guard_mask: &self.hardware_idx_guard_mask, + memory_idx_mask: &self.memory_idx_mask, + } + } +} + +impl> Ringbuf { + /// Get a reader to read descriptors from the ring buffer. + pub(super) fn get_reader(&mut self) -> RingbufReader<'_, '_, ADP, BUF> { + RingbufReader { + buf: self.buf.lock(), + head: &mut self.head, + tail: &mut self.tail, + read_cnt: 0, + adaptor: &self.adaptor, + depth: self.depth, + elem_size: self.elem_size, + hardware_idx_mask: &self.hardware_idx_mask, + hardware_idx_guard_mask: &self.hardware_idx_guard_mask, + memory_idx_mask: &self.memory_idx_mask, + } + } +} + +impl> Ringbuf { + pub(super) fn get_polling_descriptor_reader( + &mut self, + ) -> RingbufPollDescriptorReader<'_, '_, ADP, BUF> { + RingbufPollDescriptorReader { + buf: self.buf.lock(), + adaptor: &self.adaptor, + head: &mut self.head, + tail: &mut self.tail, + depth: self.depth, + elem_size: self.elem_size, + hardware_idx_mask: &self.hardware_idx_mask, + hardware_idx_guard_mask: &self.hardware_idx_guard_mask, + memory_idx_mask: &self.memory_idx_mask, + } + } +} + +impl<'a, ADP: ProducerRbMetaAdaptor, BUF: AsMut<[u8]>> RingbufWriter<'a, '_, ADP, BUF> { + /// get a buffer to write a descriptor to the ring buffer + pub(crate) fn next(&mut self) -> DeviceResult<&'a mut [u8]> { + self.next_timeout(None) + } + + /// get a buffer to write a descriptor to the ring buffer with timeout + /// + /// If the timeout is reached, it will return `DeviceError::Timeout`. + /// If the timeout is `None`, it will block until a descriptor is ready. + /// + /// # Errors + /// `DeviceError::Timeout`: if the timeout is reached. + /// Other: if the underlying adaptor returns an error. + pub(crate) fn next_timeout( + &mut self, + timeout: Option, + ) -> DeviceResult<&'a mut [u8]> { + let timeout_in_millis = timeout.map_or(0, |d| d.as_millis()); + let start = std::time::Instant::now(); + let idx = self.next_head_idx(); + if self.would_it_full(idx, *self.tail) { + // write back first + self.advance_head()?; + loop { + let new_tail = self.adaptor.read_tail()?; + if !self.would_it_full(idx, new_tail as usize) { + *self.tail = new_tail as usize; + break; + } + if timeout_in_millis > 0 && start.elapsed().as_millis() > timeout_in_millis { + return Err(DeviceError::Timeout); + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + let buf = get_descriptor_mut_helper( + self.buf.as_mut(), + idx, + self.elem_size, + *self.memory_idx_mask, + ); + self.written_cnt = self.written_cnt.wrapping_add(1); + Ok(buf) + } + + /// Write back the head pointer to the hardware. + pub(crate) fn flush(&mut self) -> DeviceResult<()> { + self.advance_head() + } + + fn would_it_full(&self, new_head: usize, new_tail: usize) -> bool { + is_full_helper( + new_head, + new_tail, + *self.memory_idx_mask, // the mask for the rest bits + *self.hardware_idx_guard_mask, // the mask for the highest bit + ) + } + + fn next_head_idx(&self) -> usize { + wrapping_add_helper(*self.head, self.written_cnt, *self.hardware_idx_mask) + } + + fn advance_head(&mut self) -> DeviceResult<()> { + let new_head = self.next_head_idx(); + *self.head = new_head; + // since the head is got by wrapping_add, it's safe to cast to u32 + #[allow(clippy::cast_possible_truncation)] + self.adaptor.write_head(new_head as u32)?; + self.written_cnt = 0; + Ok(()) + } +} + +/// Drop the writer to update the head pointer. +impl> Drop for RingbufWriter<'_, '_, ADP, BUF> { + fn drop(&mut self) { + if let Err(e) = self.advance_head() { + log::error!("failed to advance head pointer: {:?}", e); + } + } +} + +/// A reader for host to read descriptors from the ring buffer. +pub(super) struct RingbufReader<'ringbuf, 'adaptor, ADP: CustomerRbMetaAdaptor, BUF: AsRef<[u8]>> { + buf: MutexGuard<'ringbuf, BUF>, + head: &'ringbuf mut usize, + tail: &'ringbuf mut usize, + read_cnt: usize, + adaptor: &'adaptor ADP, + depth: usize, + elem_size: usize, + hardware_idx_mask: &'ringbuf usize, + hardware_idx_guard_mask: &'ringbuf usize, + memory_idx_mask: &'ringbuf usize, +} + +impl<'ringbuf, ADP: CustomerRbMetaAdaptor, BUF: AsRef<[u8]>> RingbufReader<'ringbuf, '_, ADP, BUF> { + /// read a descriptor from the ring buffer + pub(crate) fn next(&mut self) -> DeviceResult<&'ringbuf [u8]> { + self.next_timeout(None) + } + + /// read a descriptor from the ring buffer with timeout + /// + /// If the timeout is reached, it will return `DeviceError::Timeout`. + /// If the timeout is `None`, it will block until a descriptor is ready. + /// + /// # Errors + /// `DeviceError::Timeout`: if the timeout is reached. + /// Other: if the underlying adaptor returns an error. + pub(crate) fn next_timeout( + &mut self, + timeout: Option, + ) -> DeviceResult<&'ringbuf [u8]> { + let timeout_in_millis = timeout.map_or(0, |d| d.as_millis()); + let start = std::time::Instant::now(); + if self.is_full() { + self.advance_tail()?; + } + let next_tail_idx = self.next_tail_idx(); + if Self::would_it_empty(*self.head, next_tail_idx) { + loop { + let new_head = self.adaptor.read_head()?; + if !Self::would_it_empty(new_head as usize, next_tail_idx) { + *self.head = new_head as usize; + break; + } + if timeout_in_millis > 0 && start.elapsed().as_millis() > timeout_in_millis { + return Err(DeviceError::Timeout); + } + } + } + self.read_cnt = self.read_cnt.wrapping_add(1); + let buf = get_descriptor_helper( + self.buf.as_ref(), + next_tail_idx, + self.elem_size, + *self.memory_idx_mask, + ); + Ok(buf) + } + + /// Write back the tail pointer to the hardware. + pub(crate) fn flush(&mut self) -> DeviceResult<()> { + self.advance_tail() + } + + fn next_tail_idx(&self) -> usize { + wrapping_add_helper(*self.tail, self.read_cnt, *self.hardware_idx_mask) + } + + fn would_it_empty(new_head: usize, new_tail: usize) -> bool { + is_empty_helper(new_head, new_tail) + } + + fn is_full(&self) -> bool { + is_full_helper( + *self.head, + *self.tail, + *self.memory_idx_mask, // the mask for the rest bits + *self.hardware_idx_guard_mask, // the mask for the highest bit + ) + } + + fn advance_tail(&mut self) -> DeviceResult<()> { + let new_tail = self.next_tail_idx(); + #[allow(clippy::cast_possible_truncation)] // new_tail must be in range of `DEPTH *2` + self.adaptor.write_tail(new_tail as u32)?; + *self.tail = new_tail; + self.read_cnt = 0; + Ok(()) + } +} + +impl> Drop for RingbufReader<'_, '_, ADP, BUF> { + fn drop(&mut self) { + if let Err(e) = self.advance_tail() { + log::error!("failed to advance tail pointer: {:?}", e); + } + } +} + +/// A polling reader for host to read descriptors from the ring buffer. +pub(super) struct RingbufPollDescriptorReader< + 'ringbuf, + 'adaptor, + ADP: PollDescriptor, + BUF: AsMut<[u8]>, +> { + buf: MutexGuard<'ringbuf, BUF>, + adaptor: &'adaptor ADP, + head: &'ringbuf mut usize, + tail: &'ringbuf mut usize, + depth: usize, + elem_size: usize, + hardware_idx_mask: &'ringbuf usize, + hardware_idx_guard_mask: &'ringbuf usize, + memory_idx_mask: &'ringbuf usize, +} + +impl<'ringbuf, ADP: PollDescriptor, BUF: AsMut<[u8]>> + RingbufPollDescriptorReader<'_, '_, ADP, BUF> +{ + pub(crate) fn next(&mut self) -> DeviceResult<&'ringbuf [u8]> { + self.next_timeout(None) + } + + fn next_timeout(&mut self, timeout: Option) -> DeviceResult<&'ringbuf [u8]> { + let timeout_in_millis = timeout.map_or(0, |d| d.as_millis()); + let start = std::time::Instant::now(); + let current = self.current_idx(); + + loop { + let buf = get_descriptor_mut_helper( + self.buf.as_mut(), + current, + self.elem_size, + *self.memory_idx_mask, + ); + if self.adaptor.poll(buf)? { + self.advance_idx(); + return Ok(buf); + } + if timeout_in_millis > 0 && start.elapsed().as_millis() > timeout_in_millis { + return Err(DeviceError::Timeout); + } + } + } + + const fn current_idx(&self) -> usize { + *self.head + } + + fn advance_idx(&mut self) { + let next_idx = wrapping_add_helper(self.current_idx(), 1, *self.hardware_idx_mask); + *self.head = next_idx; + *self.tail = next_idx; + } +} + +const fn _is_power_of_2(v: usize) -> bool { + (v & (v.wrapping_sub(1))) == 0 +} + +fn is_full_helper( + head: usize, + tail: usize, + memory_idx_mask: usize, + hardware_idx_guard_mask: usize, +) -> bool { + // Since the highest bit stands for two times of the DEPTH in bineary, if the head and tail have different highest bit and the rest bits are the same, + // it means the ringbuf is full. + // In hardware we use like `(head.idx == tail.idx) && (head.guard != tail.guard)` + let head_guard = head & hardware_idx_guard_mask; + let tail_guard = tail & hardware_idx_guard_mask; + let head_low = head & memory_idx_mask; + let tail_low = tail & memory_idx_mask; + (head_guard != tail_guard) && (head_low == tail_low) +} + +const fn is_empty_helper(head: usize, tail: usize) -> bool { + head == tail +} + +const fn wrapping_add_helper(current: usize, cnt: usize, hardware_idx_mask: usize) -> usize { + current.wrapping_add(cnt) & hardware_idx_mask +} + +const fn gen_hardware_idx_mask(depth: usize) -> usize { + // depth * 2 -1 + depth.wrapping_mul(2).wrapping_sub(1) +} + +const fn gen_hardware_idx_guard_mask(depth: usize) -> usize { + depth +} + +const fn gen_memory_idx_mask(depth: usize) -> usize { + // depth -1 + depth.wrapping_sub(1) +} + +#[allow(unsafe_code)] +fn get_descriptor_mut_helper( + buf: &mut [u8], + idx: usize, + element_size: usize, + idx_mask: usize, +) -> &'static mut [u8] { + let offset = (idx & idx_mask).wrapping_mul(element_size); + let ptr = unsafe { buf.as_mut_ptr().add(offset) }; + unsafe { std::slice::from_raw_parts_mut(ptr, element_size) } +} + +#[allow(unsafe_code)] +fn get_descriptor_helper( + buf: &[u8], + idx: usize, + element_size: usize, + idx_mask: usize, +) -> &'static [u8] { + let offset = (idx & idx_mask).wrapping_mul(element_size); + let ptr = unsafe { buf.as_ptr().add(offset) }; + unsafe { std::slice::from_raw_parts(ptr, element_size) } +} + +#[cfg(test)] +mod test { + use std::{ + slice::from_raw_parts_mut, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, + thread::{sleep, spawn}, + time::Duration, + }; + + use rand::Rng; + + use crate::device::DeviceError; + + use super::{PollDescriptor, Ringbuf}; + + #[derive(Debug, Clone)] + struct Adaptor(Arc); + + #[derive(Debug)] + struct AdaptorInner { + head: AtomicU32, + tail: AtomicU32, + } + + impl Adaptor { + fn consume(&self) { + // move the tail to the head + let head = self.0.head.load(Ordering::Acquire); + self.0.tail.store(head, Ordering::Release); + } + + fn check(&self, max_value: u32) { + // move the tail to the head + let head = self.0.head.load(Ordering::Acquire); + let tail = self.0.head.load(Ordering::Acquire); + assert!(tail <= max_value && head <= max_value); + let diff = (head as i32 - tail as i32) as u32 & max_value; + assert!(diff <= max_value); + } + + fn produce(&self, cnt: usize) { + // move the head to the tail + let head = self.0.head.load(Ordering::Acquire); + let tail = self.0.tail.load(Ordering::Acquire); + let cnt = cnt % (DEPTH + 1); + let is_full = ((head as i32 - tail as i32) as usize) == DEPTH; + if !is_full { + let new_head = (head + cnt as u32) % (DEPTH * 2) as u32; + self.0.head.store(new_head, Ordering::Release); + } + } + fn head(&self) -> u32 { + self.0.head.load(Ordering::Acquire) + } + + fn tail(&self) -> u32 { + self.0.tail.load(Ordering::Acquire) + } + } + impl super::ProducerRbMetaAdaptor for Adaptor { + fn write_head(&self, data: u32) -> Result<(), DeviceError> { + self.0.head.store(data, Ordering::Release); + Ok(()) + } + fn read_tail(&self) -> Result { + Ok(self.0.tail.load(Ordering::Acquire)) + } + } + impl super::CustomerRbMetaAdaptor for Adaptor { + fn write_tail(&self, data: u32) -> Result<(), DeviceError> { + self.0.tail.store(data, Ordering::Release); + Ok(()) + } + fn read_head(&self) -> Result { + Ok(self.0.head.load(Ordering::Acquire)) + } + } + + const PAGE_SIZE: usize = 4096; + + #[test] + fn test_ringbuf_writer() { + const MAX_DEPTH: usize = 128; + const MAX_VALUE: u32 = 255; + let adaptor = Adaptor(Arc::new(AdaptorInner { + head: AtomicU32::new(0), + tail: AtomicU32::new(0), + })); + let thread_proxy = adaptor.clone(); + let _ = spawn(move || loop { + sleep(std::time::Duration::from_millis(100)); + thread_proxy.consume(); + thread_proxy.check(MAX_VALUE); + }); + let buffer = vec![0u8; PAGE_SIZE]; + let mut ringbuf = Ringbuf::new(adaptor.clone(), buffer, MAX_DEPTH, 32, PAGE_SIZE); + let mut writer = ringbuf.get_writer(); + + for i in 0..128 { + let desc = writer.next().unwrap(); + desc.fill(i); + } + drop(writer); + assert!(adaptor.head() == 128); + assert!(adaptor.tail() == 0); + let mut writer = ringbuf.get_writer(); + assert!(writer + .next_timeout(Some(Duration::from_millis(10))) + .is_err()); + assert!(writer + .next_timeout(Some(Duration::from_millis(10))) + .is_err()); + drop(writer); + sleep(std::time::Duration::from_millis(100)); + assert!(adaptor.head() == 128); + assert!(adaptor.tail() == 128); + // test if blocking? + + let mut writer = ringbuf.get_writer(); + for i in 0..=256 { + let desc = writer.next().unwrap(); + desc.fill(i as u8); + } + drop(writer); + } + + #[test] + fn test_ringbuf_writer_random_write() { + // test if we write random number of descriptors, will it work correctly + const MAX_DEPTH: usize = 128; + const MAX_VALUE: u32 = 255; + let adaptor = Adaptor(Arc::new(AdaptorInner { + head: AtomicU32::new(0), + tail: AtomicU32::new(0), + })); + let buffer = vec![0u8; PAGE_SIZE]; + let mut ringbuf = Ringbuf::new(adaptor.clone(), buffer, MAX_DEPTH, 32, PAGE_SIZE); + let thread_proxy = adaptor.clone(); + let _ = spawn(move || { + let mut rng = rand::thread_rng(); + sleep(std::time::Duration::from_millis(10)); + loop { + // periodically and randomly consume the ringbuf + let sleep_time: u64 = rng.gen_range(1..10); + sleep(std::time::Duration::from_millis(sleep_time)); + thread_proxy.consume(); + thread_proxy.check(MAX_VALUE); + } + }); + + let mut rng = rand::thread_rng(); + for _ in 0..500 { + let mut writer = ringbuf.get_writer(); + let batch_to_write: u8 = rng.gen_range(3..200); + for _ in 0..batch_to_write { + let desc = writer.next().unwrap(); + desc.fill(batch_to_write); + } + adaptor.check(MAX_VALUE); + } + } + + #[test] + fn test_ringbuf_reader() { + const MAX_DEPTH: usize = 128; + const MAX_VALUE: u32 = 255; + let adaptor = Adaptor(Arc::new(AdaptorInner { + head: AtomicU32::new(0), + tail: AtomicU32::new(0), + })); + let thread_proxy = adaptor.clone(); + let _ = spawn(move || loop { + thread_proxy.produce::<128>(128); + sleep(std::time::Duration::from_millis(10)); + thread_proxy.check(MAX_VALUE); + }); + let buffer = vec![0u8; PAGE_SIZE]; + let mut ringbuf = Ringbuf::new(adaptor.clone(), buffer, MAX_DEPTH, 32, PAGE_SIZE); + let mut reader = ringbuf.get_reader(); + sleep(std::time::Duration::from_millis(100)); + for _i in 0..128 { + let _desc = reader.next().unwrap(); + } + drop(reader); + assert!(adaptor.tail() == 128); + + let mut reader = ringbuf.get_reader(); + + let finish_flag = Arc::new(AtomicBool::new(false)); + for _i in 0..130 { + let _desc = reader.next().unwrap(); + } + drop(reader); + finish_flag.store(true, Ordering::Relaxed); + } + + #[test] + fn test_ringbuf_reader_random() { + const MAX_DEPTH: usize = 128; + const MAX_VALUE: u32 = 255; + let adaptor = Adaptor(Arc::new(AdaptorInner { + head: AtomicU32::new(0), + tail: AtomicU32::new(0), + })); + let thread_proxy = adaptor.clone(); + let _ = spawn(move || { + let mut rng = rand::thread_rng(); + loop { + thread_proxy.check(MAX_VALUE); + let produce: u8 = rng.gen_range(1..128); + thread_proxy.produce::(produce.into()); + sleep(std::time::Duration::from_millis(10)); + } + }); + let mut buffer = vec![0u8; PAGE_SIZE]; + + for i in 0..128 { + for j in 0..32 { + buffer[i * 32 + j] = i as u8; + } + } + let mut ringbuf = Ringbuf::new(adaptor.clone(), buffer, MAX_DEPTH, 32, PAGE_SIZE); + let mut reader = ringbuf.get_reader(); + for i in 0..4096 { + let desc = reader.next().unwrap(); + assert!(desc[0] == (i % 128) as u8); + } + } + + struct MockDma { + head: u32, + memory: &'static mut [u8], + } + struct PollingAdaptor; + impl PollDescriptor for PollingAdaptor { + fn poll(&self, buf: &mut [u8]) -> Result { + if buf[0] == 1 { + buf[0] = 0; + Ok(true) + } else { + Ok(false) + } + } + } + + impl MockDma { + fn new(memory: &'static mut [u8]) -> Self { + Self { + head: u32::default(), + memory, + } + } + + fn move_head(&mut self, n: u32, depth: u32, elem_size: u32) { + let head = self.head; + let n = n % (depth + 1); + for i in 0..n { + let offset = (((head + i) % depth) * elem_size) as usize; + self.memory[offset] = 1; + } + let next_head = (head + n) % depth; + self.head = next_head; + } + } + + #[test] + fn test_ringbuf_reader_polling() { + const MAX_DEPTH: usize = 128; + let mut buffer = vec![0u8; PAGE_SIZE]; + + for i in 0..128 { + for j in 0..32 { + buffer[i * 32 + j] = i as u8; + } + buffer[i * 32] = 0; + } + let dma_buf = unsafe { from_raw_parts_mut(buffer.as_mut_ptr(), PAGE_SIZE) }; + + let mut dma = MockDma::new(dma_buf); + + let adaptor = PollingAdaptor; + let mut ringbuf = Ringbuf::new(adaptor, buffer, MAX_DEPTH, 32, PAGE_SIZE); + let mut reader = ringbuf.get_polling_descriptor_reader(); + dma.move_head(64, MAX_DEPTH.try_into().unwrap(), 32); + for i in 0..64 { + let desc = reader.next().unwrap(); + assert_eq!(desc[1], i); + } + dma.move_head(128, MAX_DEPTH.try_into().unwrap(), 32); + for i in 64..192 { + let desc = reader.next().unwrap(); + assert_eq!(desc[1], i % 128); + } + } +} diff --git a/rust-driver/src/lib.rs b/rust-driver/src/lib.rs new file mode 100644 index 0000000..56bdd31 --- /dev/null +++ b/rust-driver/src/lib.rs @@ -0,0 +1,155 @@ +//! open-rdma-driver +#![deny( + // The following are allowed by default lints according to + // https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html + absolute_paths_not_starting_with_crate, + explicit_outlives_requirements, + // elided_lifetimes_in_paths, // allow anonymous lifetime + explicit_outlives_requirements, + keyword_idents, + macro_use_extern_crate, + meta_variable_misuse, + missing_abi, + missing_copy_implementations, + missing_debug_implementations, + // must_not_suspend, unstable + missing_docs, + non_ascii_idents, + // non_exhaustive_omitted_patterns, unstable + noop_method_call, + pointer_structural_match, + rust_2021_incompatible_closure_captures, + rust_2021_incompatible_or_patterns, + rust_2021_prefixes_incompatible_syntax, + rust_2021_prelude_collisions, + single_use_lifetimes, + // trivial_casts, // We allow trivial_casts for casting a pointer + trivial_numeric_casts, + unreachable_pub, + unsafe_code, + unsafe_op_in_unsafe_fn, + unstable_features, + unused_extern_crates, + unused_import_braces, + unused_lifetimes, + unused_qualifications, + unused_results, + variant_size_differences, + + clippy::all, + clippy::pedantic, + clippy::cargo, + + // The followings are selected restriction lints for rust 1.57 + // clippy::as_conversions, // we allow lossless "as" conversion, it has checked by clippy::cast_possible_truncation + clippy::clone_on_ref_ptr, + clippy::create_dir, + clippy::dbg_macro, + clippy::decimal_literal_representation, + clippy::default_numeric_fallback, + clippy::disallowed_script_idents, + clippy::else_if_without_else, + clippy::exhaustive_enums, + clippy::exhaustive_structs, + clippy::exit, + clippy::expect_used, + clippy::filetype_is_file, + clippy::float_arithmetic, + clippy::float_cmp_const, + clippy::get_unwrap, + clippy::if_then_some_else_none, + // clippy::implicit_return, it's idiomatic Rust code. + clippy::indexing_slicing, + clippy::inline_asm_x86_intel_syntax, + clippy::arithmetic_side_effects, + + // clippy::pattern_type_mismatch, // cause some false postive and unneeded copy + clippy::print_stderr, + clippy::print_stdout, + clippy::rc_buffer, + clippy::rc_mutex, + clippy::rest_pat_in_fully_bound_structs, + clippy::same_name_method, + clippy::self_named_module_files, + // clippy::shadow_reuse, it’s a common pattern in Rust code + // clippy::shadow_same, it’s a common pattern in Rust code + clippy::shadow_unrelated, + clippy::str_to_string, + clippy::string_add, + clippy::string_to_string, + clippy::unimplemented, + clippy::unnecessary_self_imports, + clippy::unneeded_field_pattern, + // clippy::unreachable, // the unreachable code should unreachable otherwise it's a bug + clippy::unwrap_in_result, + clippy::unwrap_used, + clippy::use_debug, + clippy::verbose_file_reads, + clippy::wildcard_enum_match_arm, + + // // The followings are selected lints from 1.61.0 to 1.67.1 + clippy::as_ptr_cast_mut, + clippy::derive_partial_eq_without_eq, + clippy::empty_drop, + clippy::empty_structs_with_brackets, + clippy::format_push_string, + clippy::iter_on_empty_collections, + clippy::iter_on_single_items, + clippy::large_include_file, + clippy::manual_clamp, + clippy::suspicious_xor_used_as_pow, + clippy::unnecessary_safety_comment, + clippy::unnecessary_safety_doc, + clippy::unused_peekable, + clippy::unused_rounding, + + // The followings are selected restriction lints from rust 1.68.0 to 1.71.0 + // clippy::allow_attributes, still unstable + clippy::impl_trait_in_params, + clippy::let_underscore_untyped, + clippy::missing_assert_message, + clippy::multiple_unsafe_ops_per_block, + clippy::semicolon_inside_block, + // clippy::semicolon_outside_block, already used `semicolon_inside_block` + clippy::tests_outside_test_module, + // 1.71.0 + clippy::default_constructed_unit_structs, + clippy::items_after_test_module, + clippy::manual_next_back, + clippy::manual_while_let_some, + clippy::needless_bool_assign, + clippy::non_minimal_cfg, +)] + +#![cfg_attr( + test, + allow( + clippy::indexing_slicing, + unused_results, + clippy::unwrap_used, + clippy::unwrap_in_result, + clippy::expect_used, + clippy::as_conversions, + clippy::shadow_unrelated, + clippy::arithmetic_side_effects, + clippy::let_underscore_untyped, + clippy::pedantic, + clippy::default_numeric_fallback, + clippy::print_stderr, + unsafe_code, + ) +)] + +mod rdma; + +/// adaptor device: hardware, software, emulated +mod device; + +/// some useful types +pub mod types; + +/// A recycle buffer allocator for ack buffer +mod recycle_allocator; + +/// utility functions +mod utils; diff --git a/rust-driver/src/rdma/mod.rs b/rust-driver/src/rdma/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/rust-driver/src/recycle_allocator.rs b/rust-driver/src/recycle_allocator.rs new file mode 100644 index 0000000..bf96523 --- /dev/null +++ b/rust-driver/src/recycle_allocator.rs @@ -0,0 +1,134 @@ +use std::{ + slice::{from_raw_parts, from_raw_parts_mut}, + sync::atomic::{AtomicU16, Ordering}, +}; + +use crate::types::{Key, Sge}; + +/// A slot buffer allocated by `PacketBufAllocator` +pub(crate) struct Slot(*mut u8, Key); + +impl Slot { + const _ASSERT_SLOT_SIZE : () = assert!(SLOT_SIZE < u32::MAX as usize, "SLOT SIZE too large"); + /// Convert a slot into a sge so that it can be integrated into RDMA wqe + #[allow(clippy::cast_possible_truncation)] + pub(crate) fn into_sge(self, real_size: u32) -> Sge { + assert!( + real_size <= SLOT_SIZE as u32,// assert in _ASSERT_SLOT_SIZE + "The real size should be less than the slot size" + ); + Sge { + phy_addr: self.0 as u64, + len: real_size, // safe to cast here + key: self.1, + } + } +} + +impl AsRef<[u8]> for Slot { + fn as_ref(&self) -> &[u8] { + // SAFETY: There is no way to create Slot without PacketBufAllocator outside the mod. + // And the buffer allocated by `PacketBufAllocator` is fixed size. + #[allow(unsafe_code)] + unsafe { + from_raw_parts(self.0, SLOT_SIZE) + } + } +} + +impl AsMut<[u8]> for Slot { + fn as_mut(&mut self) -> &mut [u8] { + // SAFETY: There is no way to create Slot without PacketBufAllocator outside the mod. + // And the buffer allocated by `PacketBufAllocator` is fixed size. + #[allow(unsafe_code)] + unsafe { + from_raw_parts_mut(self.0, SLOT_SIZE) + } + } +} + +/// A structure to hold the acknowledge and basic nic packet buffer +/// +/// The element is `Option` because the `Queue` need to initialize some nodes as Sentinel +/// while the reference can not be initialized as `None`. +#[derive(Debug)] +pub(crate) struct PacketBufAllocator { + head: AtomicU16, + start_va: usize, + slot_length: u16, + lkey: Key, +} + +impl PacketBufAllocator { + /// Create a new `PacketBufAllocator` + #[allow(clippy::arithmetic_side_effects, clippy::cast_possible_truncation)] + pub(crate) fn new(start_va: usize, length: usize, lkey: Key) -> Self { + assert!( + length % SLOT_SIZE == 0, + "The length should be multiple of SLOT_SIZE" + ); // the allocator is used internally. + let slot_length = (length / SLOT_SIZE) as u16; + Self { + head: AtomicU16::default(), + start_va, + lkey, + slot_length, + } + } + + /// Take a buffer from current ring allocator. + #[allow(clippy::arithmetic_side_effects, clippy::cast_possible_truncation)] + pub(crate) fn recycle_buf(&self) -> Slot { + let mut prev = self.head.load(Ordering::Relaxed); + loop { + let next = (prev + 1) % self.slot_length; + match self + .head + .compare_exchange_weak(prev, next, Ordering::SeqCst, Ordering::Relaxed) + { + Ok(_) => break, + Err(x) => prev = x, + } + } + Slot( + (self.start_va + prev as usize * SLOT_SIZE) as *mut u8, + self.lkey, + ) + } + + /// Start address + pub(crate) fn start_va(&self) -> usize { + self.start_va + } + + /// The lkey of current buffer + pub(crate) fn lkey(&self) -> Key { + self.lkey + } +} + +#[cfg(test)] +mod tests { + use crate::types::Key; + + use super::PacketBufAllocator; + + #[test] + fn test_recycle_buffer() { + // test if the buffer can be recycled + const RDMA_ACK_BUFFER_SLOT_SIZE: usize = 128; + let mem = Box::leak(Box::new([0u8; 1024 * RDMA_ACK_BUFFER_SLOT_SIZE])); + let base_va = mem.as_ptr() as usize; + let buffer: PacketBufAllocator = + PacketBufAllocator::new(base_va, 1024 * RDMA_ACK_BUFFER_SLOT_SIZE, Key::new_unchecked(0x1000)); + for i in 0..2048 { + let slot = buffer.recycle_buf(); + assert_eq!( + slot.0 as usize, + mem.as_ptr() as usize + (i % 1024) * RDMA_ACK_BUFFER_SLOT_SIZE + ); + } + assert_eq!(buffer.lkey(), buffer.lkey); + assert_eq!(buffer.start_va(), buffer.start_va); + } +} diff --git a/rust-driver/src/types.rs b/rust-driver/src/types.rs new file mode 100644 index 0000000..6a5046b --- /dev/null +++ b/rust-driver/src/types.rs @@ -0,0 +1,436 @@ +use std::{mem, net::Ipv4Addr}; + +use bitflags::bitflags; +use derive_builder::Builder; +use eui48::MacAddress; +use num_enum::TryFromPrimitive; +use thiserror::Error; + +macro_rules! reverse_byte_order_helper { + ($val:expr) => { + if cfg!(target_endian = "little") { + $val.to_be() + } else { + $val.to_le() + } + }; +} + +/// Protection Domain +#[derive(Debug, Clone, Copy, Default)] +pub struct Pd { + handle: u32, +} + +/// Type for `Imm` +#[derive(Debug, Clone, Copy, Hash, Default)] +pub struct Imm(u32); +impl Imm { + /// Create a new `Imm` with the given value. + #[must_use] + pub fn new(imm: u32) -> Self { + Self(imm) + } + + /// Get the value of `Imm`. + #[must_use] + pub fn get(&self) -> u32 { + self.0 + } + + pub(crate) fn into_ne(self) -> u32 { + reverse_byte_order_helper!(self.0) + } +} + +impl From for Imm { + fn from(imm: u32) -> Self { + Self::new(imm) + } +} + +/// Message Sequence Number +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)] +pub struct Msn(u16); +impl Msn { + /// Create a new `Msn` with the given value. + pub(crate) fn new(msn: u16) -> Self { + Self(msn) + } + + /// Get the value of `Msn`. + #[must_use] + pub fn get(&self) -> u16 { + self.0 + } + + /// Convert the value of `Msn` to big endian. + fn into_ne(self) -> u16 { + reverse_byte_order_helper!(self.0) + } +} + +impl From for Msn { + fn from(msn: u16) -> Self { + Self::new(msn) + } +} + +impl Default for Msn { + fn default() -> Self { + Self::new(0) + } +} + +/// `RKey` and `LKey` +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Default)] +pub struct Key(u32); +impl Key { + const MR_KEY_IDX_BIT_CNT: usize = 8; + + /// Create a new `Key` with the given value. + pub(crate) fn new(mr_idx: u32, key_secret: u32) -> Self { + let key_idx = mr_idx << ((u32::BITS as usize).wrapping_sub(Self::MR_KEY_IDX_BIT_CNT)); + let key_secret = key_secret >> Self::MR_KEY_IDX_BIT_CNT; + Self(key_idx | key_secret) + } + + pub(crate) fn new_unchecked(key: u32) -> Self { + Self(key) + } + + /// Get the value of `Key`. + #[must_use] + pub fn get(&self) -> u32 { + self.0 + } + + /// Convert the value of `Key` to network endian. + pub(crate) fn into_ne(self) -> u32 { + reverse_byte_order_helper!(self.0) + } +} + +/// Queue Pair Number +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Default)] +pub struct Qpn(u32); + +impl Qpn { + const WIDTH_IN_BITS: usize = 24; + const MASK: u32 = u32::MAX >> (32 - Self::WIDTH_IN_BITS); + + /// The `QPN` value should be less than 2^24; + pub(crate) fn new(qpn: u32) -> Self { + assert!(qpn <= Self::MASK, "QPN should not exceed 24 bits"); + Self(qpn) + } + + /// Get the value of `Qpn`. + #[must_use] + pub fn get(&self) -> u32 { + self.0 + } + + /// Convert the value of `qpn` to net endian. + pub(crate) fn into_ne(self) -> u32 { + let key = self.0.to_ne_bytes(); + u32::from_le_bytes([key[2], key[1], key[0], 0]) + } +} + +/// In RDMA spec, some structs are defined as 24 bits. +/// For example : `PSN`, `QPN` etc. +/// +/// This struct is used to represent these 24 bits. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Default)] +pub struct Psn(u32); + +impl Psn { + const WIDTH_IN_BITS: usize = 24; + const MASK: u32 = u32::MAX >> (32 - Self::WIDTH_IN_BITS); + const MAX_PSN_RANGE: u32 = 1 << 23_i32; + + /// Create a new `Psn` with the given value. + /// + /// # Panics + /// If the value is greater than 24 bits, it will panic. + #[must_use] + pub fn new(psn: u32) -> Self { + assert!(psn <= Self::MASK, "PSN should not exceed 24 bits"); + Self(psn) + } + + /// Get the value of `psn`. + #[must_use] + pub fn get(&self) -> u32 { + self.0 + } + + /// Convert the value of `psn` to net endian. + pub(crate) fn into_ne(self) -> u32 { + let key = self.0.to_ne_bytes(); + u32::from_le_bytes([key[2], key[1], key[0], 0]) + } + + /// wrapping add the current value with rhs + pub(crate) fn wrapping_add(self, rhs: Self) -> Self { + // since (a+b) mod p = (a + (b mod p)) mod p, we don't have to let rhs= rhs%p here + Self(self.0.wrapping_add(rhs.0) & Self::MASK) + } + + /// Get the difference between two PSN + pub(crate) fn wrapping_sub(self, rhs: Self) -> u32 { + self.0.wrapping_sub(rhs.0) & Self::MASK + } +} + +impl From for Psn { + fn from(key: u32) -> Self { + Self::new(key) + } +} + +bitflags! { + /// Memory access bit flags + #[derive(Debug, Clone, Copy)] + pub struct MemAccessTypeFlag: u8 { + /// No access flag + const IbvAccessNoFlags = 0; // Not defined in rdma-core + + /// Local write + const IbvAccessLocalWrite = 1; // (1 << 0) + + /// Remote write + const IbvAccessRemoteWrite = 2; // (1 << 1) + + /// Remote read + const IbvAccessRemoteRead = 4; // (1 << 2) + + /// Remote atomic + const IbvAccessRemoteAtomic = 8; // (1 << 3) + + /// Mw bind + const IbvAccessMwBind = 16; // (1 << 4) + + /// Zero based + const IbvAccessZeroBased = 32; // (1 << 5) + + /// On demand + const IbvAccessOnDemand = 64; // (1 << 6) + + /// Hugetlb + const IbvAccessHugetlb = 128; // (1 << 7) + + // IbvAccessRelaxedOrdering = IBV_ACCESS_OPTIONAL_FIRST, + } +} + +bitflags! { + /// Work Request Send Flag + #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] + pub struct WorkReqSendFlag: u8 { + /// No flags + const IbvSendNoFlags = 0; // Not defined in rdma-core + /// Send fence + const IbvSendFence = 1; + /// Send signaled + const IbvSendSignaled = 2; + /// Send solicited + const IbvSendSolicited = 4; + /// Send inline + const IbvSendInline = 8; + /// Send IP checksum + const IbvSendChecksum = 16; + } +} + +/// Queue Pair Type for software/hardware +#[non_exhaustive] +#[derive(TryFromPrimitive, Debug, Clone, Copy)] +#[repr(u8)] +pub enum QpType { + /// Reliable Connection + Rc = 2, + + /// Unreliable Connection + Uc = 3, + + /// Unreliable Datagram + Ud = 4, + + /// Raw Packet + RawPacket = 8, + + /// XRC Send + XrcSend = 9, + + /// XRC Receive + XrcRecv = 10, +} + +/// Packet MTU +#[non_exhaustive] +#[derive(Default, Debug, Clone, Copy)] +#[repr(u8)] +pub enum Pmtu { + /// 256 bytes + #[default] + Mtu256 = 1, + + /// 512 bytes + Mtu512 = 2, + + /// 1024 bytes + Mtu1024 = 3, + + /// 2048 bytes + Mtu2048 = 4, + + /// 4096 bytes + Mtu4096 = 5, +} + +impl From for u32 { + fn from(pmtu: Pmtu) -> u32 { + const BASE_VALUE: u32 = 128; + BASE_VALUE << (pmtu as usize) + } +} + +/// Scatter Gather Element +#[non_exhaustive] +#[derive(Debug, Clone, Copy)] +pub struct Sge { + /// physical address + pub phy_addr: u64, + + /// Length + pub len: u32, + + /// LKey + pub key: Key, +} + +impl Sge { + /// Create a new `Sge` + #[must_use] + pub fn new(phy_addr: u64, len: u32, key: Key) -> Self { + Self { phy_addr, len, key } + } +} + +/// RDMA network param +#[derive(Debug, Builder, Clone, Copy)] +#[non_exhaustive] +pub struct RdmaDeviceNetworkParam { + /// Network gateway + gateway: Ipv4Addr, + + /// Network netmask + netmask: Ipv4Addr, + + /// IP address + ipaddr: Ipv4Addr, + + /// MAC address + macaddr: MacAddress, +} + +/// Queue Pair imuutable context +#[non_exhaustive] +#[derive(Builder, Debug, Clone, Copy)] +pub struct Qp { + /// Protection Domain + pd: Pd, + + /// Queue Pair Number + qpn: Qpn, + + /// Peer Queue Pair Number + peer_qpn: Qpn, + + /// Queue Pair Type + qp_type: QpType, + + /// Receive Queue Access Flags + rq_acc_flags: MemAccessTypeFlag, + + /// Packet MTU + pmtu: Pmtu, + + /// Destination IP + dqp_ip: Ipv4Addr, + + /// Destination MAC + dqp_mac: MacAddress, +} + +/// Error type for user +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum Error { + /// Invalid user input + #[error("Invalid: {0}")] + Invalid(Box), +} + +#[cfg(test)] +mod tests { + use crate::types::Psn; + use std::slice::from_raw_parts; + + #[test] + fn test_wrapping_add() { + let psn = Psn::new(0xffffff); + let ret = psn.wrapping_add(1.into()); + assert_eq!(0, ret.get()); + + let ret = psn.wrapping_add(2.into()); + assert_eq!(ret.get(), 1); + + let ret = psn.wrapping_add(0xffffff.into()); + assert_eq!(ret.get(), 0xffffff - 1); + } + + #[test] + fn test_to_ne() { + let psn = Psn::new(0x123456); + let mem = psn.into_ne(); + let buf = unsafe { from_raw_parts(&mem as *const _ as *const u8, 4) }; + assert_eq!(buf, &[0x12, 0x34, 0x56, 0]); + + let key = crate::types::Key::new_unchecked(0x12345678); + let mem = key.into_ne(); + let buf = unsafe { from_raw_parts(&mem as *const _ as *const u8, 4) }; + assert_eq!(buf, &[0x12, 0x34, 0x56, 0x78]); + } + + #[test] + fn test_wrapping_sub() { + let psn = Psn::new(0); + let psn2 = psn.wrapping_sub(1.into()); + assert_eq!(psn2, 0xffffff); + + let psn = Psn::new(0x800001); + let diff = psn.wrapping_sub(0.into()); + assert_eq!(diff, 0x800001); + + let psn = Psn::new(0); + let diff = psn.wrapping_sub(0x800001.into()); + assert_eq!(diff, 0x7fffff); + } + + #[test] + fn test_pmtu() { + let pmtu = crate::types::Pmtu::Mtu256; + assert_eq!(u32::from(pmtu), 256); + let pmtu = crate::types::Pmtu::Mtu512; + assert_eq!(u32::from(pmtu), 512); + let pmtu = crate::types::Pmtu::Mtu1024; + assert_eq!(u32::from(pmtu), 1024); + let pmtu = crate::types::Pmtu::Mtu2048; + assert_eq!(u32::from(pmtu), 2048); + let pmtu = crate::types::Pmtu::Mtu4096; + assert_eq!(u32::from(pmtu), 4096); + } +} diff --git a/rust-driver/src/utils.rs b/rust-driver/src/utils.rs new file mode 100644 index 0000000..60d2ebc --- /dev/null +++ b/rust-driver/src/utils.rs @@ -0,0 +1,71 @@ +use crate::types::Pmtu; + +/// Get the length of the first packet. +/// +/// A buffer will be divided into multiple packets if any slice is crossed the boundary of pmtu +/// For example, if pmtu = 256 and va = 254, then the first packet can be at most 2 bytes. +/// If pmtu = 256 and va = 256, then the first packet can be at most 256 bytes. +#[inline] +pub(crate) fn get_first_packet_max_length(va: u64, pmtu: u32) -> u32 { + // The offset is smaller than pmtu, which is smaller than 4096 currently. + #[allow(clippy::cast_possible_truncation, clippy::arithmetic_side_effects)] + let offset = (va.wrapping_rem(u64::from(pmtu))) as u32; + + pmtu.wrapping_sub(offset) +} + +/// Calculate that how many packets needs +pub(crate) fn calculate_packet_cnt(pmtu: Pmtu, raddr: u64, total_len: u32) -> u32 { + let first_pkt_max_len = get_first_packet_max_length(raddr, u32::from(pmtu)); + let first_pkt_len = total_len.min(first_pkt_max_len); + + 1u32.wrapping_add((total_len.wrapping_sub(first_pkt_len)).div_ceil(u32::from(pmtu))) +} + +/// convert an u8 slice to a u64 +pub(crate) fn u8_slice_to_u64(slice: &[u8]) -> u64 { + // this operation convert a [u8;8] to a u64. So it's safe to left shift + slice + .iter() + .take(8) + .fold(0, |a, b| (a << 8_i32).wrapping_add(u64::from(*b))) +} + +#[cfg(test)] +mod tests { + use crate::types::Pmtu; + + use super::{calculate_packet_cnt, get_first_packet_max_length, u8_slice_to_u64}; + + #[test] + fn test_calculate_packet_cnt() { + let raddr = 0; + let total_len = 4096; + let packet_cnt = calculate_packet_cnt(Pmtu::Mtu1024, raddr, total_len); + assert_eq!(packet_cnt, 4); + + for raddr in 1..1023 { + let packet_cnt = calculate_packet_cnt(Pmtu::Mtu1024, raddr, total_len); + assert_eq!(packet_cnt, 5); + } + } + + #[test] + fn test_get_first_packet_max_length() { + for i in 0..4096 { + assert_eq!(get_first_packet_max_length(i, 4096), (4096 - i) as u32); + } + } + + #[test] + fn test_u8_slice_to_u64() { + let input = [0x2, 0x3, 0x4, 0x5]; + assert_eq!(u8_slice_to_u64(&input), 0x02030405); + let input = [0x2, 0x3, 0x4, 0x5, 0x06, 0x07]; + assert_eq!(u8_slice_to_u64(&input), 0x020304050607); + let input = [0x2, 0x3, 0x4, 0x5, 0x06, 0x07, 0x08, 0x09]; + assert_eq!(u8_slice_to_u64(&input), 0x0203040506070809); + let input = [0x2, 0x3, 0x4, 0x5, 0x06, 0x07, 0x08, 0x09, 0x10]; + assert_eq!(u8_slice_to_u64(&input), 0x0203040506070809); + } +}