Skip to content

Commit

Permalink
fix: get notifications analytics (#410)
Browse files Browse the repository at this point in the history
* fix: get notifications analytics

* fix: basic request topic

* fix: add message_id

* fix: get_by_iss

* feat: get_by_domain

* feat: project details

* fix: subscriber details

* fix: add notification_topic

* fix: returned_count

* fix: notification details
  • Loading branch information
chris13524 authored Mar 12, 2024
1 parent 514f8e9 commit 46fb200
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 34 deletions.
77 changes: 77 additions & 0 deletions src/analytics/get_notifications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct GetNotificationsParams {
pub topic: Topic,
pub message_id: Arc<str>,
pub get_by_iss: Arc<str>,
pub get_by_domain: Arc<str>,
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub subscriber_account: AccountId,
pub notification_topic: Topic,
pub subscriber_notification_id: Uuid,
pub notification_id: Uuid,
pub notification_type: Uuid,
pub returned_count: usize,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct GetNotifications {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// The relay topic used to manage the subscription that the get notifications request message was published to
pub topic: Arc<str>,
/// Relay message ID of request
pub message_id: Arc<str>,
/// JWT iss that made the request
pub get_by_iss: Arc<str>,
/// CACAO domain that made the request
pub get_by_domain: Arc<str>,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: String,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub subscriber_pk: String,
/// Hash of the CAIP-10 account of the subscriber
pub subscriber_account_hash: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// The ID of the subscriber-specific notification
pub subscriber_notification_id: Uuid,
/// The ID of the notification
pub notification_id: Uuid,
/// The notification type ID
pub notification_type: Uuid,
/// The total number of notifications returned in the request
pub returned_count: usize,
}

impl From<GetNotificationsParams> for GetNotifications {
fn from(params: GetNotificationsParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
topic: params.topic.into_value(),
message_id: params.message_id,
get_by_iss: params.get_by_iss,
get_by_domain: params.get_by_domain,
project_pk: params.project_pk.to_string(),
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk.to_string(),
subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()),
notification_topic: params.notification_topic.into_value(),
subscriber_notification_id: params.subscriber_notification_id,
notification_id: params.notification_id,
notification_type: params.notification_type,
returned_count: params.returned_count,
}
}
}
68 changes: 42 additions & 26 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use {
self::{
subscriber_notification::SubscriberNotificationParams,
subscriber_update::SubscriberUpdateParams,
},
crate::{
analytics::{
subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate,
},
config::Configuration,
get_notifications::{GetNotifications, GetNotificationsParams},
subscriber_notification::{SubscriberNotification, SubscriberNotificationParams},
subscriber_update::{SubscriberUpdate, SubscriberUpdateParams},
},
crate::config::Configuration,
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
tracing::{error, info},
Expand All @@ -27,13 +23,15 @@ use {
},
};

pub mod get_notifications;
pub mod subscriber_notification;
pub mod subscriber_update;

#[derive(Clone)]
pub struct NotifyAnalytics {
pub messages: Analytics<SubscriberNotification>,
pub clients: Analytics<SubscriberUpdate>,
pub subscriber_notifications: Analytics<SubscriberNotification>,
pub subscriber_updates: Analytics<SubscriberUpdate>,
pub get_notifications: Analytics<GetNotifications>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand All @@ -42,8 +40,9 @@ impl NotifyAnalytics {
info!("initializing analytics with noop export");

Self {
messages: Analytics::new(NoopCollector),
clients: Analytics::new(NoopCollector),
subscriber_notifications: Analytics::new(NoopCollector),
subscriber_updates: Analytics::new(NoopCollector),
get_notifications: Analytics::new(NoopCollector),
geoip_resolver: None,
}
}
Expand All @@ -60,7 +59,7 @@ impl NotifyAnalytics {
let bucket_name: Arc<str> = export_bucket.into();
let node_ip: Arc<str> = node_ip.to_string().into();

let messages = {
let subscriber_notifications = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/subscriber_notifications",
export_name: "subscriber_notifications",
Expand All @@ -70,36 +69,53 @@ impl NotifyAnalytics {
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<SubscriberNotification>::new(opts.clone(), exporter)?;
Analytics::new(collector)
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let clients = {
let subscriber_updates = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/subscriber_updates",
export_name: "subscriber_updates",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::<SubscriberUpdate>::new(opts, exporter)?)
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let get_notifications = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/get_notifications",
export_name: "get_notifications",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

Ok(Self {
messages,
clients,
subscriber_notifications,
subscriber_updates,
get_notifications,
geoip_resolver,
})
}

pub fn message(&self, message: SubscriberNotificationParams) {
self.messages.collect(message.into());
pub fn subscriber_notification(&self, event: SubscriberNotificationParams) {
self.subscriber_notifications.collect(event.into());
}

pub fn subscriber_update(&self, event: SubscriberUpdateParams) {
self.subscriber_updates.collect(event.into());
}

pub fn client(&self, client: SubscriberUpdateParams) {
self.clients.collect(client.into());
pub fn get_notifications(&self, event: GetNotificationsParams) {
self.get_notifications.collect(event.into());
}

pub fn lookup_geo_data(&self, addr: IpAddr) -> Option<geoip::Data> {
Expand Down
6 changes: 6 additions & 0 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,11 @@ pub async fn delete_expired_subscription_watchers(

#[derive(Debug, FromRow, Clone, Serialize, Deserialize)]
pub struct Notification {
/// Notification ID (for analytics)
#[serde(skip)]
pub notification_id: Uuid,

/// Subscriber notification ID
pub id: Uuid,
pub sent_at: i64,
pub r#type: Uuid,
Expand Down Expand Up @@ -879,6 +884,7 @@ pub async fn get_notifications_for_subscriber(
let query = &format!(
"
SELECT
notification.id AS notification_id,
subscriber_notification.id AS id,
CAST(EXTRACT(EPOCH FROM subscriber_notification.created_at AT TIME ZONE 'UTC') * 1000 AS int8) AS sent_at,
notification.type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
}
});

state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
analytics::get_notifications::GetNotificationsParams,
auth::{
add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp,
DidWeb, SharedClaims, SubscriptionGetNotificationsRequestAuth,
Expand Down Expand Up @@ -31,7 +32,7 @@ use {
relay_rpc::{
auth::ed25519_dalek::SigningKey,
domain::{DecodedClientId, Topic},
rpc::Publish,
rpc::{msg_id::get_message_id, Publish},
},
std::sync::Arc,
tracing::info,
Expand All @@ -40,6 +41,7 @@ use {
// TODO test idempotency
pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), RelayMessageError> {
let topic = msg.topic;
let relay_message_id: Arc<str> = get_message_id(msg.message.as_ref()).into();

if let Some(redis) = state.redis.as_ref() {
notify_get_notifications_rate_limit(redis, &topic, &state.clock).await?;
Expand Down Expand Up @@ -97,7 +99,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Err(RelayMessageClientError::AppDoesNotMatch)?;
}

let account = {
let (account, siwe_domain) = {
if request_auth.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT {
return Err(AuthError::InvalidAct)
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;
Expand All @@ -107,7 +109,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
let Authorization {
account,
app,
domain: _,
domain,
} = verify_identity(
&request_iss_client_id,
&request_auth.ksu,
Expand All @@ -126,7 +128,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
}
}

account
(account, Arc::<str>::from(domain))
};

request_auth
Expand All @@ -142,6 +144,24 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?

for notification in data.notifications.iter() {
state.analytics.get_notifications(GetNotificationsParams {
topic: topic.clone(),
message_id: relay_message_id.clone(),
get_by_iss: request_auth.shared_claims.iss.clone().into(),
get_by_domain: siwe_domain.clone(),
project_pk: project.id,
project_id: project.project_id.clone(),
subscriber_pk: subscriber.id,
subscriber_account: subscriber.account.clone(),
notification_topic: subscriber.topic.clone(),
subscriber_notification_id: notification.id,
notification_id: notification.notification_id,
notification_type: notification.r#type,
returned_count: data.notifications.len(),
});
}

let identity = DecodedClientId(
decode_key(&project.authentication_public_key)
.map_err(RelayMessageServerError::NotifyServerError)?, // TODO change to client error?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
info!("Timing: Finished subscribing to topic");

info!("Timing: Recording SubscriberUpdateParams");
state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
// )
// .await?;

state.analytics.client(SubscriberUpdateParams {
state.analytics.subscriber_update(SubscriberUpdateParams {
project_pk: project.id,
project_id: project.project_id,
pk: subscriber.id,
Expand Down
2 changes: 1 addition & 1 deletion src/services/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ async fn process_notification(
let message_id = publish.msg_id();
publish_relay_message(&relay_client, &publish, metrics).await?;

analytics.message(SubscriberNotificationParams {
analytics.subscriber_notification(SubscriberNotificationParams {
project_pk: notification.project,
project_id: notification.project_project_id,
subscriber_pk: notification.subscriber,
Expand Down

0 comments on commit 46fb200

Please sign in to comment.