Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable CBRS Take 2 - disable at ingest layer #954

Merged
merged 5 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::Settings;
use anyhow::{bail, Error, Result};
use chrono::Utc;
use chrono::{DateTime, Utc};
use file_store::{
file_sink::FileSinkClient,
file_upload,
Expand All @@ -12,8 +12,8 @@ use helium_crypto::{Network, PublicKey, PublicKeyBinary};
use helium_proto::services::poc_mobile::{
self, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1,
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1,
HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1,
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct GrpcServer<AV> {
address: SocketAddr,
api_token: MetadataValue<Ascii>,
authorization_verifier: AV,
cbrs_disable_time: DateTime<Utc>,
}

impl<AV> ManagedTask for GrpcServer<AV>
Expand Down Expand Up @@ -109,6 +110,7 @@ where
address: SocketAddr,
api_token: MetadataValue<Ascii>,
authorization_verifier: AV,
cbrs_disable_time: DateTime<Utc>,
) -> Self {
GrpcServer {
heartbeat_report_sink,
Expand All @@ -128,6 +130,7 @@ where
address,
api_token,
authorization_verifier,
cbrs_disable_time,
}
}

Expand Down Expand Up @@ -218,7 +221,14 @@ where
&self,
request: Request<CellHeartbeatReqV1>,
) -> GrpcResult<CellHeartbeatRespV1> {
let timestamp: u64 = Utc::now().timestamp_millis() as u64;
let timestamp = Utc::now();

if timestamp >= self.cbrs_disable_time {
return Ok(Response::new(CellHeartbeatRespV1 {
id: timestamp.to_string(),
}));
}

let event = request.into_inner();

custom_tracing::record_b58("pub_key", &event.pub_key);
Expand All @@ -228,7 +238,7 @@ where
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| CellHeartbeatIngestReportV1 {
received_timestamp: timestamp,
received_timestamp: timestamp.timestamp_millis() as u64,
report: Some(event),
})?;

Expand Down Expand Up @@ -266,17 +276,23 @@ where
&self,
request: Request<DataTransferSessionReqV1>,
) -> GrpcResult<DataTransferSessionRespV1> {
let timestamp = Utc::now().timestamp_millis() as u64;
let timestamp = Utc::now();
let event = request.into_inner();

if is_data_transfer_for_cbrs(&event) && timestamp > self.cbrs_disable_time {
return Ok(Response::new(DataTransferSessionRespV1 {
id: timestamp.to_string(),
}));
}

custom_tracing::record_b58("pub_key", &event.pub_key);

let report = self
.verify_public_key(event.pub_key.as_ref())
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| DataTransferSessionIngestReportV1 {
received_timestamp: timestamp,
received_timestamp: timestamp.timestamp_millis() as u64,
report: Some(event),
})?;

Expand Down Expand Up @@ -556,6 +572,13 @@ where
}
}

fn is_data_transfer_for_cbrs(event: &DataTransferSessionReqV1) -> bool {
event
.data_transfer_usage
.as_ref()
.is_some_and(|u| u.radio_access_technology() == DataTransferRadioAccessTechnology::Eutran)
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
// Initialize uploader
let (file_upload, file_upload_server) =
Expand Down Expand Up @@ -723,6 +746,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
settings.listen_addr,
api_token,
AuthorizationClient::from_settings(config_client)?,
settings.cbrs_disable_time,
);

tracing::info!(
Expand Down
9 changes: 9 additions & 0 deletions ingest/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use config::{Config, Environment, File};
use helium_crypto::Network;
use humantime_serde::re::humantime;
Expand Down Expand Up @@ -44,6 +45,14 @@ pub struct Settings {
// mobile config client settings
// optional to avoid having to define a client for IOT mode
pub config_client: Option<mobile_config::ClientSettings>,
#[serde(default = "default_cbrs_disable_time")]
pub cbrs_disable_time: DateTime<Utc>,
}

fn default_cbrs_disable_time() -> DateTime<Utc> {
"2025-03-01 00:00:00Z"
.parse::<DateTime<Utc>>()
.expect("invalid default date")
}

fn default_roll_time() -> Duration {
Expand Down
130 changes: 126 additions & 4 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use helium_crypto::{KeyTag, Keypair, Network, PublicKeyBinary, Sign};
use helium_proto::services::poc_mobile::{
HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1, DataTransferEvent,
DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, RadioUsageStatsResV1,
UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, UniqueConnectionsRespV1,
};
Expand All @@ -20,6 +22,7 @@ use mobile_config::client::authorization_client::AuthorizationVerifier;
use prost::Message;
use rand::rngs::OsRng;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::{net::TcpListener, sync::mpsc::Receiver, time::timeout};
use tonic::{
async_trait,
Expand Down Expand Up @@ -49,7 +52,9 @@ impl AuthorizationVerifier for MockAuthorizationClient {
Ok(true)
}
}
pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
pub async fn setup_mobile(
cbrs_disable_time: DateTime<Utc>,
) -> anyhow::Result<(TestClient, Trigger)> {
let key_pair = generate_keypair();

let socket_addr = {
Expand All @@ -65,10 +70,10 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {

let (trigger, listener) = triggered::trigger();

let (cbrs_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (cbrs_heartbeat_tx, cbrs_hearbeat_rx) = tokio::sync::mpsc::channel(10);
let (wifi_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (speedtest_tx, _rx) = tokio::sync::mpsc::channel(10);
let (data_transfer_tx, _rx) = tokio::sync::mpsc::channel(10);
let (data_transfer_tx, data_transfer_rx) = tokio::sync::mpsc::channel(10);
let (subscriber_location_tx, _rx) = tokio::sync::mpsc::channel(10);
let (radio_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
let (invalidated_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
Expand Down Expand Up @@ -100,6 +105,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
socket_addr,
api_token,
auth_client,
cbrs_disable_time,
);

grpc_server.run(listener).await
Expand All @@ -113,6 +119,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
hex_usage_stat_rx,
radio_usage_stat_rx,
unique_connections_rx,
cbrs_hearbeat_rx,
data_transfer_rx,
)
.await;

Expand All @@ -131,9 +139,12 @@ pub struct TestClient {
Receiver<file_store::file_sink::Message<RadioUsageStatsIngestReportV1>>,
unique_connections_file_sink_rx:
Receiver<file_store::file_sink::Message<UniqueConnectionsIngestReportV1>>,
cell_heartbeat_rx: Receiver<file_store::file_sink::Message<CellHeartbeatIngestReportV1>>,
data_transfer_rx: Receiver<file_store::file_sink::Message<DataTransferSessionIngestReportV1>>,
}

impl TestClient {
#[allow(clippy::too_many_arguments)]
pub async fn new(
socket_addr: SocketAddr,
key_pair: Keypair,
Expand All @@ -150,6 +161,10 @@ impl TestClient {
unique_connections_file_sink_rx: Receiver<
file_store::file_sink::Message<UniqueConnectionsIngestReportV1>,
>,
cell_heartbeat_rx: Receiver<file_store::file_sink::Message<CellHeartbeatIngestReportV1>>,
data_transfer_rx: Receiver<
file_store::file_sink::Message<DataTransferSessionIngestReportV1>,
>,
) -> TestClient {
let client = (|| PocMobileClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
Expand All @@ -164,6 +179,48 @@ impl TestClient {
hex_usage_stats_file_sink_rx,
radio_usage_stats_file_sink_rx,
unique_connections_file_sink_rx,
cell_heartbeat_rx,
data_transfer_rx,
}
}

pub async fn cell_heartbeat_recv(mut self) -> anyhow::Result<CellHeartbeatIngestReportV1> {
match timeout(Duration::from_secs(2), self.cell_heartbeat_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Data(_, data) => Ok(data),
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"),
},
Ok(None) => bail!("got none"),
Err(reason) => bail!("got error {reason}"),
}
}

pub fn is_cell_heartbeat_rx_empty(&mut self) -> anyhow::Result<bool> {
match self.cell_heartbeat_rx.try_recv() {
Ok(_) => Ok(false),
Err(TryRecvError::Empty) => Ok(true),
Err(err) => bail!(err),
}
}

pub async fn data_transfer_recv(mut self) -> anyhow::Result<DataTransferSessionIngestReportV1> {
match timeout(Duration::from_secs(2), self.data_transfer_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Data(_, data) => Ok(data),
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"),
},
Ok(None) => bail!("got none"),
Err(reason) => bail!("got error {reason}"),
}
}

pub fn is_data_transfer_rx_empty(&mut self) -> anyhow::Result<bool> {
match self.data_transfer_rx.try_recv() {
Ok(_) => Ok(false),
Err(TryRecvError::Empty) => Ok(true),
Err(err) => bail!(err),
}
}

Expand Down Expand Up @@ -368,6 +425,71 @@ impl TestClient {

Ok(res.into_inner())
}

pub async fn submit_cell_heartbeat(
&mut self,
keypair: &Keypair,
cbsd_id: &str,
) -> anyhow::Result<CellHeartbeatRespV1> {
let mut heartbeat = CellHeartbeatReqV1 {
pub_key: keypair.public_key().into(),
hotspot_type: "unknown".to_string(),
cell_id: 1,
timestamp: Utc::now().timestamp() as u64,
lat: 0.0,
lon: 0.0,
operation_mode: true,
cbsd_category: "unknown".to_string(),
cbsd_id: cbsd_id.to_owned(),
signature: vec![],
coverage_object: vec![1, 2, 3, 4],
};

heartbeat.signature = keypair.sign(&heartbeat.encode_to_vec()).expect("sign");

let mut request = Request::new(heartbeat);
let metadata = request.metadata_mut();

metadata.insert("authorization", self.authorization.clone());

let res = self.client.submit_cell_heartbeat(request).await?;

Ok(res.into_inner())
}

pub async fn submit_data_transfer(
&mut self,
keypair: &Keypair,
technology: DataTransferRadioAccessTechnology,
) -> anyhow::Result<DataTransferSessionRespV1> {
let mut data_transfer = DataTransferSessionReqV1 {
data_transfer_usage: Some(DataTransferEvent {
pub_key: keypair.public_key().into(),
upload_bytes: 0,
download_bytes: 0,
radio_access_technology: technology as i32,
event_id: "event-1".to_string(),
payer: vec![1, 2, 3, 4],
timestamp: Utc::now().timestamp() as u64,
signature: vec![],
}),
reward_cancelled: false,
pub_key: keypair.public_key().into(),
signature: vec![],
rewardable_bytes: 0,
};

data_transfer.signature = keypair.sign(&data_transfer.encode_to_vec())?;

let mut request = Request::new(data_transfer);
let metadata = request.metadata_mut();

metadata.insert("authorization", self.authorization.clone());

let res = self.client.submit_data_transfer_session(request).await?;

Ok(res.into_inner())
}
}

pub fn generate_keypair() -> Keypair {
Expand Down
Loading