Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read state #423

Merged
merged 13 commits into from
Mar 20, 2024
87 changes: 63 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "postgres",
wiremock = "0.5.19"
itertools = "0.11.0"
sha3 = "0.10.8"
validator = { version = "0.16.1", features = ["derive"] }
validator = { version = "0.17.0", features = ["derive"] }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrade to support custom validators accepting Option<T> instead of T, allowing the ability to validate the state of the Option and not just T.

Some may consider this change a bug

k256 = "0.13.1"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions migrations/20240319001421_is_read.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE subscriber_notification ADD COLUMN is_read BOOLEAN NOT NULL DEFAULT FALSE;
73 changes: 73 additions & 0 deletions src/analytics/mark_notifications_as_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct MarkNotificationsAsReadParams {
pub topic: Topic,
pub message_id: Arc<str>,
pub by_iss: Arc<str>,
pub by_domain: Arc<str>,
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub subscriber_account: AccountId,
pub notification_topic: Topic,
pub subscriber_notification_pk: Uuid,
pub notification_pk: Uuid,
pub marked_count: usize,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct MarkNotificationsAsRead {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// The relay topic used to manage the subscription that the get notifications request message was published to
pub topic: Arc<str>,
/// Relay message ID of request
pub message_id: Arc<str>,
/// JWT iss that made the request
pub get_by_iss: Arc<str>,
/// CACAO domain that made the request
pub get_by_domain: Arc<str>,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: String,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub subscriber_pk: String,
/// Hash of the CAIP-10 account of the subscriber
pub subscriber_account_hash: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// Primary key of the subscriber-specific notification in the Notify Server database
pub subscriber_notification_pk: String,
/// Primary key of the notification in the Notify Server database
pub notification_pk: String,
/// The total number of notifications returned in the request
pub marked_count: usize,
}

impl From<MarkNotificationsAsReadParams> for MarkNotificationsAsRead {
fn from(params: MarkNotificationsAsReadParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
topic: params.topic.into_value(),
message_id: params.message_id,
get_by_iss: params.by_iss,
get_by_domain: params.by_domain,
project_pk: params.project_pk.to_string(),
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk.to_string(),
subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()),
notification_topic: params.notification_topic.into_value(),
subscriber_notification_pk: params.subscriber_notification_pk.to_string(),
notification_pk: params.notification_pk.to_string(),
marked_count: params.marked_count,
}
}
}
22 changes: 22 additions & 0 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
self::{
get_notifications::{GetNotifications, GetNotificationsParams},
mark_notifications_as_read::{MarkNotificationsAsRead, MarkNotificationsAsReadParams},
notification_link::{NotificationLink, NotificationLinkParams},
subscriber_notification::{SubscriberNotification, SubscriberNotificationParams},
subscriber_update::{SubscriberUpdate, SubscriberUpdateParams},
Expand All @@ -25,6 +26,7 @@ use {
};

pub mod get_notifications;
pub mod mark_notifications_as_read;
pub mod notification_link;
pub mod subscriber_notification;
pub mod subscriber_update;
Expand All @@ -34,6 +36,7 @@ pub struct NotifyAnalytics {
pub subscriber_notifications: Analytics<SubscriberNotification>,
pub subscriber_updates: Analytics<SubscriberUpdate>,
pub get_notifications: Analytics<GetNotifications>,
pub mark_notifications_as_read: Analytics<MarkNotificationsAsRead>,
pub notification_links: Analytics<NotificationLink>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}
Expand All @@ -46,6 +49,7 @@ impl NotifyAnalytics {
subscriber_notifications: Analytics::new(NoopCollector),
subscriber_updates: Analytics::new(NoopCollector),
get_notifications: Analytics::new(NoopCollector),
mark_notifications_as_read: Analytics::new(NoopCollector),
notification_links: Analytics::new(NoopCollector),
geoip_resolver: None,
}
Expand Down Expand Up @@ -102,6 +106,19 @@ impl NotifyAnalytics {
Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let mark_notifications_as_read = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/mark_notifications_as_read",
export_name: "mark_notifications_as_read",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

Analytics::new(ParquetWriter::new(opts.clone(), exporter)?)
};

let notification_links = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/notification_links",
Expand All @@ -119,6 +136,7 @@ impl NotifyAnalytics {
subscriber_notifications,
subscriber_updates,
get_notifications,
mark_notifications_as_read,
notification_links,
geoip_resolver,
})
Expand All @@ -136,6 +154,10 @@ impl NotifyAnalytics {
self.get_notifications.collect(event.into());
}

pub fn mark_notifications_as_read(&self, event: MarkNotificationsAsReadParams) {
self.mark_notifications_as_read.collect(event.into());
}

pub fn notification_links(&self, event: NotificationLinkParams) {
self.notification_links.collect(event.into());
}
Expand Down
57 changes: 54 additions & 3 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use {
error::NotifyServerError,
metrics::Metrics,
model::{
helpers::{GetNotificationsParams, GetNotificationsResult},
helpers::{
GetNotificationsParams, GetNotificationsResult, MarkNotificationsAsReadParams,
MarkNotificationsAsReadParamsValidatorContext,
},
types::{AccountId, AccountIdParseError},
},
registry::storage::{error::StorageError, redis::Redis, KeyValueStorage},
Expand Down Expand Up @@ -43,7 +46,7 @@ use {
tracing::{debug, info, warn},
url::Url,
uuid::Uuid,
validator::Validate,
validator::{Validate, ValidateArgs},
x25519_dalek::{PublicKey, StaticSecret},
};

Expand Down Expand Up @@ -134,6 +137,8 @@ pub struct NotifyServerSubscription {
pub scope: HashSet<Uuid>, // TODO 15 hard limit
/// Unix timestamp of expiration
pub expiry: u64,
/// Number of unread notifications
pub unread_notification_count: u64,
}

impl GetSharedClaims for WatchSubscriptionsResponseAuth {
Expand Down Expand Up @@ -313,7 +318,7 @@ pub struct SubscriptionGetNotificationsRequestAuth {
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
#[validate]
#[validate(nested)]
Copy link
Member Author

@chris13524 chris13524 Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for silent breaking change in 0.17.0. See Keats/validator#307

pub params: GetNotificationsParams,
}

Expand Down Expand Up @@ -348,6 +353,52 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// ksu - key server for identity key verification
pub ksu: String,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
#[serde(flatten)]
pub params: MarkNotificationsAsReadParams,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
self.params
.validate_with_args(&MarkNotificationsAsReadParamsValidatorContext {
all: self.params.all,
})
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadRequestAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
/// did:pkh
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
}

impl GetSharedClaims for SubscriptionMarkNotificationsAsReadResponseAuth {
fn get_shared_claims(&self) -> &SharedClaims {
&self.shared_claims
}
}

#[derive(Debug, Error)]
pub enum JwtError {
#[error("Missing message part")]
Expand Down
Loading
Loading