Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the pipeline: batch allocation + deterministic backoff #1769

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ console-subscriber = "0.4.0"
const_format = "0.2.33"
crc = "3.2.1"
criterion = "0.5"
crossbeam-utils = "0.8.20"
crossbeam-queue = "0.3.12"
derive_more = { version = "1.0.0", features = ["as_ref"] }
derive-new = "0.7.0"
Expand Down
123 changes: 79 additions & 44 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@

/// The node's mode (router, peer or client)
mode: "peer",

/// The node's metadata (name, location, DNS name, etc.) Arbitrary JSON data not interpreted by zenoh and available in admin space @/<zid>/router, @/<zid>/peer or @/<zid>/client
metadata: {
name: "strawberry",
location: "Penny Lane",
},

/// Which endpoints to connect to. E.g. tcp/localhost:7447.
/// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup.
///
Expand All @@ -32,8 +30,11 @@
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
/// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }).
timeout_ms: { router: -1, peer: -1, client: 0 },

timeout_ms: {
router: -1,
peer: -1,
client: 0
},
/// The list of endpoints to connect to.
/// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }).
Expand All @@ -42,15 +43,18 @@
endpoints: [
// "<proto>/<address>"
],

/// Global connect configuration,
/// Accepts a single value or different values for router, peer and client.
/// The configuration can also be specified for the separate endpoint
/// it will override the global one
/// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000"

/// exit from application, if timeout exceed
exit_on_failure: { router: false, peer: false, client: true },
exit_on_failure: {
router: false,
peer: false,
client: true
},
/// connect establishing retry configuration
retry: {
/// initial wait timeout until next connect try
Expand All @@ -61,7 +65,6 @@
period_increase_factor: 2,
},
},

/// Which endpoints to listen on. E.g. tcp/0.0.0.0:7447.
/// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers,
/// peers, or client can use to establish a zenoh session.
Expand All @@ -79,14 +82,19 @@
/// Accepts a single value (e.g. timeout_ms: 0)
/// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }).
timeout_ms: 0,

/// The list of endpoints to listen on.
/// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }).
///
/// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html
endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] },

endpoints: {
router: [
"tcp/[::]:7447"
],
peer: [
"tcp/[::]:0"
]
},
/// Global listen configuration,
/// Accepts a single value or different values for router, peer and client.
/// The configuration can also be specified for the separate endpoint
Expand Down Expand Up @@ -130,14 +138,24 @@
/// The socket which should be used for multicast scouting
address: "224.0.0.224:7446",
/// The network interface which should be used for multicast scouting
interface: "auto", // If not set or set to "auto" the interface if picked automatically
interface: "auto",
// If not set or set to "auto" the interface if picked automatically
/// The time-to-live on multicast scouting packets
ttl: 1,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer", "router" and/or "client".
autoconnect: { router: [], peer: ["router", "peer"], client: ["router"] },
autoconnect: {
router: [],
peer: [
"router",
"peer"
],
client: [
"router"
]
},
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
/// Possible options are:
/// - "always": always attempt to autoconnect, may result in redundant connections.
Expand All @@ -150,7 +168,12 @@
/// (e.g. autoconnect_strategy : { to_router: "always", to_peer: "greater-zid" }),
/// or different values for router or peer mode
/// (e.g. autoconnect_strategy : { peer: { to_router: "always", to_peer: "greater-zid" } }).
autoconnect_strategy: { peer: { to_router: "always", to_peer: "always" } },
autoconnect_strategy: {
peer: {
to_router: "always",
to_peer: "always"
}
},
/// Whether or not to listen for scout messages on UDP multicast and reply to them.
listen: true,
},
Expand All @@ -168,12 +191,27 @@
/// Accepts a single value (e.g. target: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router or peer mode (e.g. target: { router: ["router", "peer"], peer: ["router"] }).
/// Each value is a list of "peer" and/or "router".
target: { router: ["router", "peer"], peer: ["router", "peer"]},
target: {
router: [
"router",
"peer"
],
peer: [
"router",
"peer"
]
},
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router or peer mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer" and/or "router".
autoconnect: { router: [], peer: ["router", "peer"] },
autoconnect: {
router: [],
peer: [
"router",
"peer"
]
},
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
/// Possible options are:
/// - "always": always attempt to autoconnect, may result in redundant connection which will then be closed.
Expand All @@ -186,24 +224,30 @@
/// (e.g. autoconnect_strategy : { "to-router": "always", "to-peer": "greater-zid" }),
/// or different values for router or peer mode
/// (e.g. autoconnect_strategy : { peer: { "to-router": "always", "to-peer": "greater-zid" } }).
autoconnect_strategy: { peer: { "to-router": "always", "to-peer": "always" } },
autoconnect_strategy: {
peer: {
"to-router": "always",
"to-peer": "always"
}
},
},
},

/// Configuration of data messages timestamps management.
timestamping: {
/// Whether data messages should be timestamped if not already.
/// Accepts a single boolean value or different values for router, peer and client.
enabled: { router: true, peer: false, client: false },
enabled: {
router: true,
peer: false,
client: false
},
/// Whether data messages with timestamps in the future should be dropped or not.
/// If set to false (default), messages with timestamps in the future are retimestamped.
/// Timestamps are ignored if timestamping is disabled.
drop_future_timestamp: false,
},

/// The default timeout to apply to queries in milliseconds.
queries_default_timeout: 10000,

/// The routing strategy to use and it's configuration.
routing: {
/// The routing strategy to use in routers and it's configuration.
Expand All @@ -229,7 +273,6 @@
timeout: 10000,
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
Expand Down Expand Up @@ -471,14 +514,14 @@
/// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU.
/// If qos is false, then only the DATA priority will be allocated.
size: {
control: 2,
real_time: 2,
interactive_high: 2,
interactive_low: 2,
control: 1,
real_time: 1,
interactive_high: 1,
interactive_low: 1,
data_high: 2,
data: 2,
data: 4,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the previous commit, so the numbers are back to the original ones. Throughputs of big messages was indeed very impacted by a smaller batch number, 130kmsg/s -> 50kmsg/s for 70kB messages.
@Mallets Do you think we should modify the numbers? for example increasing the hight priority to 2? In fact this PR does lazy allocation, and only keep one buffer in memory, so we could put any numbers, it would not change memory consumption at all, it just a matter of throughput/congestion.

data_low: 2,
background: 2,
background: 1,
},
/// Congestion occurs when the queue is empty (no available batch).
congestion_control: {
Expand Down Expand Up @@ -506,12 +549,6 @@
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
},
allocation: {
/// Mode for memory allocation of batches in the priority queues.
/// - "init": batches are allocated at queue initialization time.
/// - "lazy": batches are allocated when needed up to the maximum number of batches configured in the size configuration parameter.
mode: "lazy",
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down Expand Up @@ -560,15 +597,15 @@
/// Configure TCP write buffer size (bytes)
// so_sndbuf: 123456,
},
// // Configure optional TCP link specific parameters
// tcp: {
// /// Optional configuration for TCP system buffers sizes for TCP links
// ///
// /// Configure TCP read buffer size (bytes)
// // so_rcvbuf: 123456,
// /// Configure TCP write buffer size (bytes)
// // so_sndbuf: 123456,
// }
// // Configure optional TCP link specific parameters
// tcp: {
// /// Optional configuration for TCP system buffers sizes for TCP links
// ///
// /// Configure TCP read buffer size (bytes)
// // so_rcvbuf: 123456,
// /// Configure TCP write buffer size (bytes)
// // so_sndbuf: 123456,
// }
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down Expand Up @@ -608,7 +645,6 @@
},
},
},

/// Configure the Admin Space
/// Unstable: this configuration part works as advertised, but may change in a future release
adminspace: {
Expand All @@ -620,7 +656,6 @@
write: false,
},
},

///
/// Plugins configurations
///
Expand Down
12 changes: 6 additions & 6 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ impl QueueSizeConf {
impl Default for QueueSizeConf {
fn default() -> Self {
Self {
control: 2,
real_time: 2,
interactive_low: 2,
interactive_high: 2,
control: 1,
real_time: 1,
interactive_low: 1,
interactive_high: 1,
data_high: 2,
data: 2,
data: 4,
data_low: 2,
background: 2,
background: 1,
}
}
}
Expand Down
15 changes: 0 additions & 15 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,6 @@ validated_struct::validator! {
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: u64,
},
/// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at
/// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches
/// configured in the size configuration parameter.
pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
QueueAllocConf {
pub mode: QueueAllocMode,
},
},
// Number of threads used for TX
threads: usize,
Expand Down Expand Up @@ -659,14 +652,6 @@ validated_struct::validator! {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueueAllocMode {
Init,
#[default]
Lazy,
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ShmInitMode {
Expand Down
26 changes: 12 additions & 14 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ transport_ws = ["zenoh-link/transport_ws"]
transport_serial = ["zenoh-link/transport_serial"]
transport_compression = []
transport_unixpipe = ["zenoh-link/transport_unixpipe"]
transport_vsock= ["zenoh-link/transport_vsock"]
transport_vsock = ["zenoh-link/transport_vsock"]
stats = ["zenoh-protocol/stats"]
test = []
unstable = []
default = ["test", "transport_multilink"]

[dependencies]
async-trait = { workspace = true }
crossbeam-utils = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"fs",
"time",
"macros",
"rt-multi-thread",
"io-util",
"net",
"sync",
"fs",
"time",
"macros",
"rt-multi-thread",
"io-util",
"net",
] }
lazy_static = { workspace = true }
tokio-util = { workspace = true, features = ["rt"]}
tokio-util = { workspace = true, features = ["rt"] }
flume = { workspace = true }
tracing = {workspace = true}
futures = { workspace = true }
tracing = { workspace = true }
lz4_flex = { workspace = true }
paste = { workspace = true }
rand = { workspace = true, features = ["default"] }
Expand All @@ -90,8 +90,6 @@ zenoh-task = { workspace = true }


[dev-dependencies]
futures-util = { workspace = true }
zenoh-util = {workspace = true }
zenoh-util = { workspace = true }
zenoh-protocol = { workspace = true, features = ["test"] }
futures = { workspace = true }
zenoh-link-commons = { workspace = true }
Loading
Loading