Skip to content

Commit

Permalink
fix: reset telemetry bidirectional stream on heartbeat timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Jan 5, 2025
1 parent 4362aef commit 232725a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 53 deletions.
4 changes: 2 additions & 2 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ impl SessionManager {
) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
return if session_map.contains_key(&endpoint_url) {
if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
let mut session = Session::new(
Expand All @@ -745,7 +745,7 @@ impl SessionManager {
let shadow_session = session.shadow_session();
session_map.insert(endpoint_url.clone(), session);
Ok(shadow_session)
};
}
}

pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
Expand Down
3 changes: 3 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub enum ErrorKind {
#[error("Failed to receive message via channel")]
ChannelReceive,

#[error("Failed to receive RPC response before timeout elapsed")]
RpcTimeout,

#[error("Unknown error")]
Unknown,
}
Expand Down
71 changes: 34 additions & 37 deletions rust/src/model/message_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,40 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;
use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};

/**
* The codec for the message-id.
*
* <p>Codec here provides the following two functions:
* 1. Provide decoding function of message-id of all versions above v0.
* 2. Provide a generator of message-id of v1 version.
*
* <p>The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version
* number. For V1, these two bytes are 0x0001.
*
* <h3>V1 message id example</h3>
*
* <pre>
* ┌──┬────────────┬────┬────────┬────────┐
* │01│56F7E71C361B│21BC│024CCDBE│00000000│
* └──┴────────────┴────┴────────┴────────┘
* </pre>
*
* <h3>V1 version message id generation rules</h3>
*
* <pre>
* process id(lower 2bytes)
* ▲
* mac address(lower 6bytes) │ sequence number(big endian)
* ▲ │ ▲ (4bytes)
* │ │ │
* ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
* 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
* └───────────┘ └─┘ └─┬─┘ └───┘
* │
* ▼
* seconds since 2021-01-01 00:00:00(UTC+0)
* (lower 4bytes)
* </pre>
*/

// inspired by https://github.com/messense/rocketmq-rs
/// The codec for the message-id.
///
/// 1. Provide decoding function of message-id of all versions above v0.
/// 2. Provide a generator of message-id of v1 version.
///
/// The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version
/// number. For V1, these two bytes are 0x0001.
///
/// <h3>V1 message id example</h3>
///
/// <pre>
/// ┌──┬────────────┬────┬────────┬────────┐
/// │01│56F7E71C361B│21BC│024CCDBE│00000000│
/// └──┴────────────┴────┴────────┴────────┘
/// </pre>
///
/// <h3>V1 version message id generation rules</h3>
///
/// <pre>
/// process id(lower 2bytes)
/// ▲
/// mac address(lower 6bytes) │ sequence number(big endian)
/// ▲ │ ▲ (4bytes)
/// │ │ │
/// ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
/// 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
/// └───────────┘ └─┘ └─┬─┘ └───┘
/// │
/// ▼
/// seconds since 2021-01-01 00:00:00(UTC+0)
/// (lower 4bytes)
/// </pre>
///
/// inspired by https://github.com/messense/rocketmq-rs
pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = Lazy::new(|| {
let mut wtr = Vec::new();
wtr.write_u8(1).unwrap();
Expand Down
90 changes: 76 additions & 14 deletions rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use async_trait::async_trait;
use mockall::{automock, mock};
use ring::hmac;
use slog::{debug, error, info, o, Logger};
use slog::{debug, error, info, o, warn, Logger};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -109,6 +111,9 @@ pub(crate) struct Session {
option: ClientOption,
endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
settings: Option<TelemetryCommand>,
telemetry_command_tx: Option<mpsc::Sender<Command>>,
telemetry_stream_epoch: Arc<AtomicUsize>,
telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
Expand All @@ -126,6 +131,9 @@ impl Session {
option: self.option.clone(),
endpoints: self.endpoints.clone(),
stub: self.stub.clone(),
settings: None,
telemetry_command_tx: None,
telemetry_stream_epoch: Arc::clone(&self.telemetry_stream_epoch),
telemetry_tx: self.telemetry_tx.clone(),
shutdown_tx: None,
}
Expand Down Expand Up @@ -178,6 +186,9 @@ impl Session {
endpoints: endpoints.clone(),
client_id,
stub,
settings: None,
telemetry_command_tx: None,
telemetry_stream_epoch: Arc::new(AtomicUsize::new(0)),
telemetry_tx: None,
shutdown_tx: None,
})
Expand Down Expand Up @@ -287,6 +298,14 @@ impl Session {
settings: TelemetryCommand,
telemetry_command_tx: mpsc::Sender<Command>,
) -> Result<(), ClientError> {
if self.settings.is_none() {
self.settings = Some(settings.clone());
}

if self.telemetry_command_tx.is_none() {
self.telemetry_command_tx = Some(telemetry_command_tx.clone());
}

let (tx, rx) = mpsc::channel(16);
tx.send(settings).await.map_err(|e| {
ClientError::new(
Expand All @@ -307,15 +326,30 @@ impl Session {
)
.set_source(e)
})?;

let logger = self.logger.clone();
let epoch = self.telemetry_stream_epoch.load(Ordering::Relaxed);
info!(
logger,
"Started telemetry bidirectional stream, stream-epoch={}", epoch
);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);

let logger = self.logger.clone();
let stream_epoch = Arc::clone(&self.telemetry_stream_epoch);
tokio::spawn(async move {
let mut stream = response.into_inner();
let mut interval = tokio::time::interval(Duration::from_secs(3));
loop {
tokio::select! {
_ = interval.tick() => {
// If the bidirectional stream has been deprecated, finish this daemon task
// immediately.
let latest_stream_epoch = stream_epoch.load(Ordering::Relaxed);
if latest_stream_epoch != epoch {
info!(logger, "Telemetry bidirectional stream epoch has changed: {} --> {}",
epoch, latest_stream_epoch);
break;
}
}
message = stream.message() => {
match message {
Ok(Some(item)) => {
Expand Down Expand Up @@ -371,6 +405,18 @@ impl Session {
}
Ok(())
}

async fn reset_telemetry_stream(&mut self) -> Result<(), ClientError> {
if let Some((settings, tx)) = self
.settings
.as_ref()
.zip(self.telemetry_command_tx.as_ref())
{
self.telemetry_stream_epoch.fetch_add(1, Ordering::Relaxed);
self.start(settings.clone(), tx.clone()).await?;
}
Ok(())
}
}

#[async_trait]
Expand All @@ -396,15 +442,31 @@ impl RPCClient for Session {
request: HeartbeatRequest,
) -> Result<HeartbeatResponse, ClientError> {
let request = self.sign(request);
let response = self.stub.heartbeat(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
"send rpc heartbeat failed",
OPERATION_HEARTBEAT,
)
.set_source(e)
})?;
Ok(response.into_inner())
let heartbeat_future = self.stub.heartbeat(request);
let future = tokio::time::timeout(self.option.timeout, heartbeat_future);
match future.await {
Ok(res) => {
let response = res.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
"send rpc heartbeat failed",
OPERATION_HEARTBEAT,
)
.set_source(e)
})?;
Ok(response.into_inner())
}
Err(elapsed) => {
warn!(self.logger, "Heartbeat RPC timed out, reset telemetry bidirectional stream");
self.reset_telemetry_stream().await?;
Err(ClientError::new(
ErrorKind::RpcTimeout,
"Heartbeat RPC timed out",
OPERATION_HEARTBEAT,
)
.set_source(elapsed))
}
}
}

async fn send_message(
Expand Down

0 comments on commit 232725a

Please sign in to comment.