Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get notifications analytics #410

Merged
merged 10 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/analytics/get_notifications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct GetNotificationsParams {
pub topic: Topic,
pub message_id: Arc<str>,
pub get_by_iss: Arc<str>,
pub get_by_domain: Arc<str>,
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub subscriber_account: AccountId,
pub notification_topic: Topic,
pub subscriber_notification_id: Uuid,
pub notification_id: Uuid,
pub notification_type: Uuid,
pub returned_count: usize,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct GetNotifications {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// The relay topic used to manage the subscription that the get notifications request message was published to
pub topic: Arc<str>,
/// Relay message ID of request
pub message_id: Arc<str>,
/// JWT iss that made the request
pub get_by_iss: Arc<str>,
/// CACAO domain that made the request
pub get_by_domain: Arc<str>,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: String,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub subscriber_pk: String,
/// Hash of the CAIP-10 account of the subscriber
pub subscriber_account_hash: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// The ID of the subscriber-specific notification
pub subscriber_notification_id: Uuid,
/// The ID of the notification
pub notification_id: Uuid,
/// The notification type ID
pub notification_type: Uuid,
/// The total number of notifications returned in the request
pub returned_count: usize,
}

impl From<GetNotificationsParams> for GetNotifications {
fn from(params: GetNotificationsParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
topic: params.topic.into_value(),
message_id: params.message_id,
get_by_iss: params.get_by_iss,
get_by_domain: params.get_by_domain,
project_pk: params.project_pk.to_string(),
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk.to_string(),
subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()),
notification_topic: params.notification_topic.into_value(),
subscriber_notification_id: params.subscriber_notification_id,
notification_id: params.notification_id,
notification_type: params.notification_type,
returned_count: params.returned_count,
}
}
}
68 changes: 42 additions & 26 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use {
self::{
subscriber_notification::SubscriberNotificationParams,
subscriber_update::SubscriberUpdateParams,
},
crate::{
analytics::{
subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate,
},
config::Configuration,
get_notifications::{GetNotifications, GetNotificationsParams},
subscriber_notification::{SubscriberNotification, SubscriberNotificationParams},
subscriber_update::{SubscriberUpdate, SubscriberUpdateParams},
},
crate::config::Configuration,
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
tracing::{error, info},
Expand All @@ -27,13 +23,15 @@ use {
},
};

pub mod get_notifications;
pub mod subscriber_notification;
pub mod subscriber_update;

#[derive(Clone)]
pub struct NotifyAnalytics {
pub messages: Analytics<SubscriberNotification>,
pub clients: Analytics<SubscriberUpdate>,
pub subscriber_notifications: Analytics<SubscriberNotification>,
pub subscriber_updates: Analytics<SubscriberUpdate>,
pub get_notifications: Analytics<GetNotifications>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand All @@ -42,8 +40,9 @@ impl NotifyAnalytics {
info!("initializing analytics with noop export");

Self {
messages: Analytics::new(NoopCollector),
clients: Analytics::new(NoopCollector),
subscriber_notifications: Analytics::new(NoopCollector),
subscriber_updates: Analytics::new(NoopCollector),
get_notifications: Analytics::new(NoopCollector),
geoip_resolver: None,
}
}
Expand All @@ -60,7 +59,7 @@ impl NotifyAnalytics {
let bucket_name: Arc<str> = export_bucket.into();
let node_ip: Arc<str> = node_ip.to_string().into();

let messages = {
let subscriber_notifications = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/subscriber_notifications",
export_name: "subscriber_notifications",
Expand All @@ -70,36 +69,53 @@ impl NotifyAnalytics {
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<SubscriberNotification>::new(opts.clone(), exporter)?;
Analytics::new(collector)
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let clients = {
let subscriber_updates = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/subscriber_updates",
export_name: "subscriber_updates",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::<SubscriberUpdate>::new(opts, exporter)?)
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let get_notifications = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/get_notifications",
export_name: "get_notifications",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

Ok(Self {
messages,
clients,
subscriber_notifications,
subscriber_updates,
get_notifications,
geoip_resolver,
})
}

pub fn message(&self, message: SubscriberNotificationParams) {
self.messages.collect(message.into());
pub fn subscriber_notification(&self, event: SubscriberNotificationParams) {
self.subscriber_notifications.collect(event.into());
}

pub fn subscriber_update(&self, event: SubscriberUpdateParams) {
self.subscriber_updates.collect(event.into());
}

pub fn client(&self, client: SubscriberUpdateParams) {
self.clients.collect(client.into());
pub fn get_notifications(&self, event: GetNotificationsParams) {
self.get_notifications.collect(event.into());
}

pub fn lookup_geo_data(&self, addr: IpAddr) -> Option<geoip::Data> {
Expand Down
6 changes: 6 additions & 0 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,11 @@ pub async fn delete_expired_subscription_watchers(

#[derive(Debug, FromRow, Clone, Serialize, Deserialize)]
pub struct Notification {
/// Notification ID (for analytics)
#[serde(skip)]
pub notification_id: Uuid,

/// Subscriber notification ID
pub id: Uuid,
pub sent_at: i64,
pub r#type: Uuid,
Expand Down Expand Up @@ -879,6 +884,7 @@ pub async fn get_notifications_for_subscriber(
let query = &format!(
"
SELECT
notification.id AS notification_id,
subscriber_notification.id AS id,
CAST(EXTRACT(EPOCH FROM subscriber_notification.created_at AT TIME ZONE 'UTC') * 1000 AS int8) AS sent_at,
notification.type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
}
});

state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
analytics::get_notifications::GetNotificationsParams,
auth::{
add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp,
DidWeb, SharedClaims, SubscriptionGetNotificationsRequestAuth,
Expand Down Expand Up @@ -31,7 +32,7 @@ use {
relay_rpc::{
auth::ed25519_dalek::SigningKey,
domain::{DecodedClientId, Topic},
rpc::Publish,
rpc::{msg_id::get_message_id, Publish},
},
std::sync::Arc,
tracing::info,
Expand All @@ -40,6 +41,7 @@ use {
// TODO test idempotency
pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), RelayMessageError> {
let topic = msg.topic;
let relay_message_id: Arc<str> = get_message_id(msg.message.as_ref()).into();

if let Some(redis) = state.redis.as_ref() {
notify_get_notifications_rate_limit(redis, &topic, &state.clock).await?;
Expand Down Expand Up @@ -97,7 +99,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Err(RelayMessageClientError::AppDoesNotMatch)?;
}

let account = {
let (account, siwe_domain) = {
if request_auth.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT {
return Err(AuthError::InvalidAct)
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;
Expand All @@ -107,7 +109,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
let Authorization {
account,
app,
domain: _,
domain,
} = verify_identity(
&request_iss_client_id,
&request_auth.ksu,
Expand All @@ -126,7 +128,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
}
}

account
(account, Arc::<str>::from(domain))
};

request_auth
Expand All @@ -142,6 +144,24 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?

for notification in data.notifications.iter() {
state.analytics.get_notifications(GetNotificationsParams {
topic: topic.clone(),
message_id: relay_message_id.clone(),
get_by_iss: request_auth.shared_claims.iss.clone().into(),
get_by_domain: siwe_domain.clone(),
project_pk: project.id,
project_id: project.project_id.clone(),
subscriber_pk: subscriber.id,
subscriber_account: subscriber.account.clone(),
notification_topic: subscriber.topic.clone(),
subscriber_notification_id: notification.id,
notification_id: notification.notification_id,
notification_type: notification.r#type,
returned_count: data.notifications.len(),
});
}

let identity = DecodedClientId(
decode_key(&project.authentication_public_key)
.map_err(RelayMessageServerError::NotifyServerError)?, // TODO change to client error?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
info!("Timing: Finished subscribing to topic");

info!("Timing: Recording SubscriberUpdateParams");
state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
// )
// .await?;

state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
2 changes: 1 addition & 1 deletion src/services/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ async fn process_notification(
let message_id = publish.msg_id();
publish_relay_message(&relay_client, &publish, metrics).await?;

analytics.message(SubscriberNotificationParams {
analytics.subscriber_notification(SubscriberNotificationParams {
project_pk: notification.project,
project_id: notification.project_project_id,
subscriber_pk: notification.subscriber,
Expand Down
Loading