Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Mar 18, 2024
1 parent 4e88223 commit 773a85b
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 3 deletions.
77 changes: 77 additions & 0 deletions src/analytics/mark_notifications_as_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct MarkNotificationsAsReadParams {
pub topic: Topic,
pub message_id: Arc<str>,
pub by_iss: Arc<str>,
pub by_domain: Arc<str>,
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub subscriber_account: AccountId,
pub notification_topic: Topic,
pub subscriber_notification_pk: Uuid,
pub notification_pk: Uuid,
pub notification_type: Uuid,
pub marked_count: usize,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct MarkNotificationsAsRead {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// The relay topic used to manage the subscription that the get notifications request message was published to
pub topic: Arc<str>,
/// Relay message ID of request
pub message_id: Arc<str>,
/// JWT iss that made the request
pub get_by_iss: Arc<str>,
/// CACAO domain that made the request
pub get_by_domain: Arc<str>,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: String,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub subscriber_pk: String,
/// Hash of the CAIP-10 account of the subscriber
pub subscriber_account_hash: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// Primary key of the subscriber-specific notification in the Notify Server database
pub subscriber_notification_id: String, // breaking change: rename to _pk
/// Primary key of the notification in the Notify Server database
pub notification_id: String, // breaking change: rename to _pk
/// The notification type ID
pub notification_type: String,
/// The total number of notifications returned in the request
pub marked_count: usize,
}

impl From<MarkNotificationsAsReadParams> for MarkNotificationsAsRead {
fn from(params: MarkNotificationsAsReadParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
topic: params.topic.into_value(),
message_id: params.message_id,
get_by_iss: params.by_iss,
get_by_domain: params.by_domain,
project_pk: params.project_pk.to_string(),
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk.to_string(),
subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()),
notification_topic: params.notification_topic.into_value(),
subscriber_notification_id: params.subscriber_notification_pk.to_string(),
notification_id: params.notification_pk.to_string(),
notification_type: params.notification_type.to_string(),
marked_count: params.marked_count,
}
}
}
18 changes: 18 additions & 0 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
self::{
get_notifications::{GetNotifications, GetNotificationsParams},
mark_notifications_as_read::MarkNotificationsAsRead,
notification_link::{NotificationLink, NotificationLinkParams},
subscriber_notification::{SubscriberNotification, SubscriberNotificationParams},
subscriber_update::{SubscriberUpdate, SubscriberUpdateParams},
Expand All @@ -25,6 +26,7 @@ use {
};

pub mod get_notifications;
pub mod mark_notifications_as_read;
pub mod notification_link;
pub mod subscriber_notification;
pub mod subscriber_update;
Expand All @@ -34,6 +36,7 @@ pub struct NotifyAnalytics {
pub subscriber_notifications: Analytics<SubscriberNotification>,
pub subscriber_updates: Analytics<SubscriberUpdate>,
pub get_notifications: Analytics<GetNotifications>,
pub mark_notifications_as_read: Analytics<MarkNotificationsAsRead>,
pub notification_links: Analytics<NotificationLink>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}
Expand All @@ -46,6 +49,7 @@ impl NotifyAnalytics {
subscriber_notifications: Analytics::new(NoopCollector),
subscriber_updates: Analytics::new(NoopCollector),
get_notifications: Analytics::new(NoopCollector),
mark_notifications_as_read: Analytics::new(NoopCollector),
notification_links: Analytics::new(NoopCollector),
geoip_resolver: None,
}
Expand Down Expand Up @@ -102,6 +106,19 @@ impl NotifyAnalytics {
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let mark_notifications_as_read = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/mark_notifications_as_read",
export_name: "mark_notifications_as_read",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let notification_links = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/notification_links",
Expand All @@ -119,6 +136,7 @@ impl NotifyAnalytics {
subscriber_notifications,
subscriber_updates,
get_notifications,
mark_notifications_as_read,
notification_links,
geoip_resolver,
})
Expand Down
46 changes: 46 additions & 0 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,52 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// ksu - key server for identity key verification
pub ksu: String,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
#[validate]
pub params: GetNotificationsParams,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadRequestAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
pub result: GetNotificationsResult,
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadResponseAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Error)]
pub enum JwtError {
#[error("Missing message part")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub enum RelayMessageClientError {
#[error("Received 4014 on unrecognized topic: {0}")]
WrongNotifyGetNotificationsTopic(Topic),

#[error("Received 4016 on unrecognized topic: {0}")]
WrongNotifyMarkNotificationsAsReadTopic(Topic),

#[error("No project found associated with app_domain {0}")]
NotifyWatchSubscriptionsAppDomainNotFound(Arc<str>),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {

pub mod notify_delete;
pub mod notify_get_notifications;
pub mod notify_mark_notifications_as_read;
pub mod notify_subscribe;
pub mod notify_update;
pub mod notify_watch_subscriptions;
Expand Down
Loading

0 comments on commit 773a85b

Please sign in to comment.