Skip to content

Commit

Permalink
feat: read state (#423)
Browse files Browse the repository at this point in the history
* fix: errors being excluded from metrics and not being logged

* feat: initial boilerplate

* fix: tests and mark as read SQL

* fix: only same subscriber

* fix: only mark unread notifications as read

* chore: test mark as read

* feat: unread count in watchSubscriptions

* chore: more tests

* fix: count multiplying by scope count

* feat: has_more_unread mur

* chore: TODO

* fix: index is_read

* chore: remove comment
  • Loading branch information
chris13524 authored Mar 20, 2024
1 parent 660a71a commit 35afe9a
Show file tree
Hide file tree
Showing 18 changed files with 2,360 additions and 49 deletions.
87 changes: 63 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "postgres",
wiremock = "0.5.19"
itertools = "0.11.0"
sha3 = "0.10.8"
validator = { version = "0.16.1", features = ["derive"] }
validator = { version = "0.17.0", features = ["derive"] }
k256 = "0.13.1"

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions migrations/20240319001421_is_read.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscriber_notification ADD COLUMN is_read BOOLEAN NOT NULL DEFAULT FALSE;
CREATE INDEX subscriber_notification_is_read_idx ON subscriber_notification (is_read);
73 changes: 73 additions & 0 deletions src/analytics/mark_notifications_as_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct MarkNotificationsAsReadParams {
pub topic: Topic,
pub message_id: Arc<str>,
pub by_iss: Arc<str>,
pub by_domain: Arc<str>,
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub subscriber_account: AccountId,
pub notification_topic: Topic,
pub subscriber_notification_pk: Uuid,
pub notification_pk: Uuid,
pub marked_count: usize,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct MarkNotificationsAsRead {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// The relay topic used to manage the subscription that the get notifications request message was published to
pub topic: Arc<str>,
/// Relay message ID of request
pub message_id: Arc<str>,
/// JWT iss that made the request
pub get_by_iss: Arc<str>,
/// CACAO domain that made the request
pub get_by_domain: Arc<str>,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: String,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub subscriber_pk: String,
/// Hash of the CAIP-10 account of the subscriber
pub subscriber_account_hash: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// Primary key of the subscriber-specific notification in the Notify Server database
pub subscriber_notification_pk: String,
/// Primary key of the notification in the Notify Server database
pub notification_pk: String,
/// The total number of notifications returned in the request
pub marked_count: usize,
}

impl From<MarkNotificationsAsReadParams> for MarkNotificationsAsRead {
fn from(params: MarkNotificationsAsReadParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
topic: params.topic.into_value(),
message_id: params.message_id,
get_by_iss: params.by_iss,
get_by_domain: params.by_domain,
project_pk: params.project_pk.to_string(),
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk.to_string(),
subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()),
notification_topic: params.notification_topic.into_value(),
subscriber_notification_pk: params.subscriber_notification_pk.to_string(),
notification_pk: params.notification_pk.to_string(),
marked_count: params.marked_count,
}
}
}
22 changes: 22 additions & 0 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
self::{
get_notifications::{GetNotifications, GetNotificationsParams},
mark_notifications_as_read::{MarkNotificationsAsRead, MarkNotificationsAsReadParams},
notification_link::{NotificationLink, NotificationLinkParams},
subscriber_notification::{SubscriberNotification, SubscriberNotificationParams},
subscriber_update::{SubscriberUpdate, SubscriberUpdateParams},
Expand All @@ -25,6 +26,7 @@ use {
};

pub mod get_notifications;
pub mod mark_notifications_as_read;
pub mod notification_link;
pub mod subscriber_notification;
pub mod subscriber_update;
Expand All @@ -34,6 +36,7 @@ pub struct NotifyAnalytics {
pub subscriber_notifications: Analytics<SubscriberNotification>,
pub subscriber_updates: Analytics<SubscriberUpdate>,
pub get_notifications: Analytics<GetNotifications>,
pub mark_notifications_as_read: Analytics<MarkNotificationsAsRead>,
pub notification_links: Analytics<NotificationLink>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}
Expand All @@ -46,6 +49,7 @@ impl NotifyAnalytics {
subscriber_notifications: Analytics::new(NoopCollector),
subscriber_updates: Analytics::new(NoopCollector),
get_notifications: Analytics::new(NoopCollector),
mark_notifications_as_read: Analytics::new(NoopCollector),
notification_links: Analytics::new(NoopCollector),
geoip_resolver: None,
}
Expand Down Expand Up @@ -102,6 +106,19 @@ impl NotifyAnalytics {
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let mark_notifications_as_read = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/mark_notifications_as_read",
export_name: "mark_notifications_as_read",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let notification_links = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/notification_links",
Expand All @@ -119,6 +136,7 @@ impl NotifyAnalytics {
subscriber_notifications,
subscriber_updates,
get_notifications,
mark_notifications_as_read,
notification_links,
geoip_resolver,
})
Expand All @@ -136,6 +154,10 @@ impl NotifyAnalytics {
self.get_notifications.collect(event.into());
}

pub fn mark_notifications_as_read(&self, event: MarkNotificationsAsReadParams) {
self.mark_notifications_as_read.collect(event.into());
}

pub fn notification_links(&self, event: NotificationLinkParams) {
self.notification_links.collect(event.into());
}
Expand Down
57 changes: 54 additions & 3 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use {
error::NotifyServerError,
metrics::Metrics,
model::{
helpers::{GetNotificationsParams, GetNotificationsResult},
helpers::{
GetNotificationsParams, GetNotificationsResult, MarkNotificationsAsReadParams,
MarkNotificationsAsReadParamsValidatorContext,
},
types::{AccountId, AccountIdParseError},
},
registry::storage::{error::StorageError, redis::Redis, KeyValueStorage},
Expand Down Expand Up @@ -43,7 +46,7 @@ use {
tracing::{debug, info, warn},
url::Url,
uuid::Uuid,
validator::Validate,
validator::{Validate, ValidateArgs},
x25519_dalek::{PublicKey, StaticSecret},
};

Expand Down Expand Up @@ -134,6 +137,8 @@ pub struct NotifyServerSubscription {
pub scope: HashSet<Uuid>, // TODO 15 hard limit
/// Unix timestamp of expiration
pub expiry: u64,
/// Number of unread notifications
pub unread_notification_count: u64,
}

impl GetSharedClaims for WatchSubscriptionsResponseAuth {
Expand Down Expand Up @@ -313,7 +318,7 @@ pub struct SubscriptionGetNotificationsRequestAuth {
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
#[validate]
#[validate(nested)]
pub params: GetNotificationsParams,
}

Expand Down Expand Up @@ -348,6 +353,52 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// ksu - key server for identity key verification
pub ksu: String,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
pub params: MarkNotificationsAsReadParams,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
self.params
.validate_with_args(&MarkNotificationsAsReadParamsValidatorContext {
all: self.params.all,
})
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadRequestAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadResponseAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Error)]
pub enum JwtError {
#[error("Missing message part")]
Expand Down
Loading

0 comments on commit 35afe9a

Please sign in to comment.