diff --git a/src/analytics/mark_notifications_as_read.rs b/src/analytics/mark_notifications_as_read.rs new file mode 100644 index 00000000..a499d03b --- /dev/null +++ b/src/analytics/mark_notifications_as_read.rs @@ -0,0 +1,77 @@ +use { + crate::model::types::AccountId, + parquet_derive::ParquetRecordWriter, + relay_rpc::domain::{ProjectId, Topic}, + serde::Serialize, + std::sync::Arc, + uuid::Uuid, +}; + +pub struct MarkNotificationsAsReadParams { + pub topic: Topic, + pub message_id: Arc, + pub by_iss: Arc, + pub by_domain: Arc, + pub project_pk: Uuid, + pub project_id: ProjectId, + pub subscriber_pk: Uuid, + pub subscriber_account: AccountId, + pub notification_topic: Topic, + pub subscriber_notification_pk: Uuid, + pub notification_pk: Uuid, + pub notification_type: Uuid, + pub marked_count: usize, +} + +#[derive(Debug, Serialize, ParquetRecordWriter)] +pub struct MarkNotificationsAsRead { + /// Time at which the event was generated + pub event_at: chrono::NaiveDateTime, + /// The relay topic used to manage the subscription that the get notifications request message was published to + pub topic: Arc, + /// Relay message ID of request + pub message_id: Arc, + /// JWT iss that made the request + pub get_by_iss: Arc, + /// CACAO domain that made the request + pub get_by_domain: Arc, + /// Primary key of the project in the Notify Server database that the subscriber is subscribed to + pub project_pk: String, + /// Project ID of the project that the subscriber is subscribed to + pub project_id: Arc, + /// Primary Key of the subscriber in the Notify Server database + pub subscriber_pk: String, + /// Hash of the CAIP-10 account of the subscriber + pub subscriber_account_hash: String, + /// The topic that notifications are sent on + pub notification_topic: Arc, + /// Primary key of the subscriber-specific notification in the Notify Server database + pub subscriber_notification_id: String, // breaking change: rename to _pk + /// Primary key of the notification in the Notify Server database + pub notification_id: String, // breaking change: rename to _pk + /// The notification type ID + pub notification_type: String, + /// The total number of notifications returned in the request + pub marked_count: usize, +} + +impl From for MarkNotificationsAsRead { + fn from(params: MarkNotificationsAsReadParams) -> Self { + Self { + event_at: wc::analytics::time::now(), + topic: params.topic.into_value(), + message_id: params.message_id, + get_by_iss: params.by_iss, + get_by_domain: params.by_domain, + project_pk: params.project_pk.to_string(), + project_id: params.project_id.into_value(), + subscriber_pk: params.subscriber_pk.to_string(), + subscriber_account_hash: sha256::digest(params.subscriber_account.as_ref()), + notification_topic: params.notification_topic.into_value(), + subscriber_notification_id: params.subscriber_notification_pk.to_string(), + notification_id: params.notification_pk.to_string(), + notification_type: params.notification_type.to_string(), + marked_count: params.marked_count, + } + } +} diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 80d86289..6323ce73 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -1,6 +1,7 @@ use { self::{ get_notifications::{GetNotifications, GetNotificationsParams}, + mark_notifications_as_read::MarkNotificationsAsRead, notification_link::{NotificationLink, NotificationLinkParams}, subscriber_notification::{SubscriberNotification, SubscriberNotificationParams}, subscriber_update::{SubscriberUpdate, SubscriberUpdateParams}, @@ -25,6 +26,7 @@ use { }; pub mod get_notifications; +pub mod mark_notifications_as_read; pub mod notification_link; pub mod subscriber_notification; pub mod subscriber_update; @@ -34,6 +36,7 @@ pub struct NotifyAnalytics { pub subscriber_notifications: Analytics, pub subscriber_updates: Analytics, pub get_notifications: Analytics, + pub mark_notifications_as_read: Analytics, pub notification_links: Analytics, pub geoip_resolver: Option>, } @@ -46,6 +49,7 @@ impl NotifyAnalytics { subscriber_notifications: Analytics::new(NoopCollector), subscriber_updates: Analytics::new(NoopCollector), get_notifications: Analytics::new(NoopCollector), + mark_notifications_as_read: Analytics::new(NoopCollector), notification_links: Analytics::new(NoopCollector), geoip_resolver: None, } @@ -102,6 +106,19 @@ impl NotifyAnalytics { Analytics::new(ParquetWriter::new(opts.clone(), exporter)?) }; + let mark_notifications_as_read = { + let exporter = AwsExporter::new(AwsOpts { + export_prefix: "notify/mark_notifications_as_read", + export_name: "mark_notifications_as_read", + file_extension: "parquet", + bucket_name: bucket_name.clone(), + s3_client: s3_client.clone(), + node_ip: node_ip.clone(), + }); + + Analytics::new(ParquetWriter::new(opts.clone(), exporter)?) + }; + let notification_links = { let exporter = AwsExporter::new(AwsOpts { export_prefix: "notify/notification_links", @@ -119,6 +136,7 @@ impl NotifyAnalytics { subscriber_notifications, subscriber_updates, get_notifications, + mark_notifications_as_read, notification_links, geoip_resolver, }) diff --git a/src/auth.rs b/src/auth.rs index 0ace7a3d..5859d2bb 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -348,6 +348,52 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth { } } +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +pub struct SubscriptionMarkNotificationsAsReadRequestAuth { + #[serde(flatten)] + pub shared_claims: SharedClaims, + /// ksu - key server for identity key verification + pub ksu: String, + /// did:pkh + pub sub: String, + /// did:web of app domain + pub app: DidWeb, + #[serde(flatten)] + #[validate] + pub params: GetNotificationsParams, +} + +impl SubscriptionMarkNotificationsAsReadRequestAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } +} + +impl GetSharedClaims for SubscriptionMarkNotificationsAsReadRequestAuth { + fn get_shared_claims(&self) -> &SharedClaims { + &self.shared_claims + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubscriptionMarkNotificationsAsReadResponseAuth { + #[serde(flatten)] + pub shared_claims: SharedClaims, + /// did:pkh + pub sub: String, + /// did:web of app domain + pub app: DidWeb, + #[serde(flatten)] + pub result: GetNotificationsResult, +} + +impl GetSharedClaims for SubscriptionMarkNotificationsAsReadResponseAuth { + fn get_shared_claims(&self) -> &SharedClaims { + &self.shared_claims + } +} + #[derive(Debug, Error)] pub enum JwtError { #[error("Missing message part")] diff --git a/src/services/public_http_server/handlers/relay_webhook/error.rs b/src/services/public_http_server/handlers/relay_webhook/error.rs index a6cb12ce..e583a26a 100644 --- a/src/services/public_http_server/handlers/relay_webhook/error.rs +++ b/src/services/public_http_server/handlers/relay_webhook/error.rs @@ -37,6 +37,9 @@ pub enum RelayMessageClientError { #[error("Received 4014 on unrecognized topic: {0}")] WrongNotifyGetNotificationsTopic(Topic), + #[error("Received 4016 on unrecognized topic: {0}")] + WrongNotifyMarkNotificationsAsReadTopic(Topic), + #[error("No project found associated with app_domain {0}")] NotifyWatchSubscriptionsAppDomainNotFound(Arc), diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/mod.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/mod.rs index 5bb69388..ac646433 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/mod.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/mod.rs @@ -8,6 +8,7 @@ use { pub mod notify_delete; pub mod notify_get_notifications; +pub mod notify_mark_notifications_as_read; pub mod notify_subscribe; pub mod notify_update; pub mod notify_watch_subscriptions; diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs new file mode 100644 index 00000000..ba565c9f --- /dev/null +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs @@ -0,0 +1,268 @@ +use { + crate::{ + analytics::get_notifications::GetNotificationsParams, + auth::{ + add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, + DidWeb, SharedClaims, SubscriptionMarkNotificationsAsReadRequestAuth, SubscriptionMarkNotificationsAsReadResponseAuth, + }, + error::NotifyServerError, + model::{ + helpers::{ + get_notifications_for_subscriber, get_project_by_id, get_subscriber_by_topic, + GetNotificationsResult, Notification, SubscriberWithScope, + }, + types::Project, + }, + publish_relay_message::publish_relay_message, + rate_limit::{self, Clock, RateLimitError}, + registry::storage::redis::Redis, + rpc::{decode_key, AuthMessage, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseError}, + services::public_http_server::handlers::{ + notification_link::format_follow_link, + relay_webhook::{ + error::{RelayMessageClientError, RelayMessageError, RelayMessageServerError}, + handlers::decrypt_message, + RelayIncomingMessage, + }, + }, + spec::{ + NOTIFY_GET_NOTIFICATIONS_ACT, NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_ACT, + NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG, + NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL, + }, + state::AppState, + types::{Envelope, EnvelopeType0}, + utils::topic_from_key, + }, + base64::Engine, + chrono::Utc, + relay_rpc::{ + auth::ed25519_dalek::SigningKey, + domain::{DecodedClientId, Topic}, + rpc::{msg_id::get_message_id, Publish}, + }, + std::sync::Arc, + tracing::info, +}; + +// TODO test idempotency +pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), RelayMessageError> { + if let Some(redis) = state.redis.as_ref() { + notify_mark_notifications_as_read_rate_limit(redis, &msg.topic, &state.clock).await?; + } + + // TODO combine these two SQL queries + let subscriber = + get_subscriber_by_topic(msg.topic.clone(), &state.postgres, state.metrics.as_ref()) + .await + .map_err(|e| match e { + sqlx::Error::RowNotFound => RelayMessageError::Client( + RelayMessageClientError::WrongNotifyMarkNotificationsAsReadTopic( + msg.topic.clone(), + ), + ), + e => RelayMessageError::Server(RelayMessageServerError::NotifyServer(e.into())), + })?; + let project = get_project_by_id(subscriber.project, &state.postgres, state.metrics.as_ref()) + .await + .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error? + info!("project.id: {}", project.id); + + let envelope = Envelope::::from_bytes( + base64::engine::general_purpose::STANDARD + .decode(msg.message.to_string()) + .map_err(RelayMessageClientError::DecodeMessage)?, + ) + .map_err(RelayMessageClientError::EnvelopeParse)?; + + let sym_key = decode_key(&subscriber.sym_key).map_err(RelayMessageServerError::DecodeKey)?; + if msg.topic != topic_from_key(&sym_key) { + return Err(RelayMessageServerError::NotifyServer( + NotifyServerError::TopicDoesNotMatchKey, + ))?; // TODO change to client error? + } + + let req = decrypt_message::(envelope, &sym_key)?; + + async fn handle( + state: &AppState, + msg: &RelayIncomingMessage, + req: &JsonRpcRequest, + subscriber: &SubscriberWithScope, + project: &Project, + ) -> Result { + info!("req.id: {}", req.id); + info!("req.jsonrpc: {}", req.jsonrpc); // TODO verify this + info!("req.method: {}", req.method); // TODO verify this + + let request_auth = + from_jwt::(&req.params.auth) + .map_err(RelayMessageClientError::Jwt)?; + info!( + "request_auth.shared_claims.iss: {:?}", + request_auth.shared_claims.iss + ); + let request_iss_client_id = + DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey) + .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error? + + if request_auth.app.domain() != project.app_domain { + Err(RelayMessageClientError::AppDoesNotMatch)?; + } + + let (account, siwe_domain) = { + if request_auth.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT { + return Err(AuthError::InvalidAct) + .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; + // TODO change to client error? + } + + let Authorization { + account, + app, + domain, + } = verify_identity( + &request_iss_client_id, + &request_auth.ksu, + &request_auth.sub, + state.redis.as_ref(), + state.provider.as_ref(), + state.metrics.as_ref(), + ) + .await?; + + // TODO verify `sub_auth.aud` matches `project_data.identity_keypair` + + if let AuthorizedApp::Limited(app) = app { + if app != project.app_domain { + Err(RelayMessageClientError::AppSubscriptionsUnauthorized)?; + } + } + + (account, Arc::::from(domain)) + }; + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + let data = get_notifications_for_subscriber( + subscriber.id, + request_auth.params, + &state.postgres, + state.metrics.as_ref(), + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error? + + let data = GetNotificationsResult { + notifications: data + .notifications + .into_iter() + .map(|notification| Notification { + url: notification.url.map(|_link| { + format_follow_link(&state.config.notify_url, ¬ification.id).to_string() + }), + ..notification + }) + .collect(), + has_more: data.has_more, + }; + + let relay_message_id: Arc = get_message_id(msg.message.as_ref()).into(); + for notification in data.notifications.iter() { + state.analytics.get_notifications(GetNotificationsParams { + topic: msg.topic.clone(), + message_id: relay_message_id.clone(), + get_by_iss: request_auth.shared_claims.iss.clone().into(), + get_by_domain: siwe_domain.clone(), + project_pk: project.id, + project_id: project.project_id.clone(), + subscriber_pk: subscriber.id, + subscriber_account: subscriber.account.clone(), + notification_topic: subscriber.topic.clone(), + subscriber_notification_pk: notification.id, + notification_pk: notification.notification_id, + notification_type: notification.r#type, + returned_count: data.notifications.len(), + }); + } + + let identity = DecodedClientId( + decode_key(&project.authentication_public_key) + .map_err(RelayMessageServerError::DecodeKey)?, + ); + + let now = Utc::now(); + let response_message = SubscriptionMarkNotificationsAsReadResponseAuth { + shared_claims: SharedClaims { + iat: now.timestamp() as u64, + exp: add_ttl(now, NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL).timestamp() + as u64, + iss: identity.to_did_key(), + aud: request_iss_client_id.to_did_key(), + act: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_ACT.to_owned(), + mjv: "1".to_owned(), + }, + sub: account.to_did_pkh(), + app: DidWeb::from_domain(project.app_domain.clone()), + result: data, + }; + let auth = sign_jwt( + response_message, + &SigningKey::from_bytes( + &decode_key(&project.authentication_private_key) + .map_err(RelayMessageServerError::DecodeKey)?, + ), + ) + .map_err(RelayMessageServerError::SignJwt)?; + Ok(AuthMessage { auth }) + } + + let result = handle(state, &msg, &req, &subscriber, &project).await; + + let response = match result { + Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result)) + .map_err(RelayMessageServerError::JsonRpcResponseSerialization)?, + Err(e) => serde_json::to_vec(&JsonRpcResponseError::new(req.id, e.into())) + .map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?, + }; + + let envelope = Envelope::::new(&sym_key, response) + .map_err(RelayMessageServerError::EnvelopeEncryption)?; + + let response = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + + publish_relay_message( + &state.relay_client, + &Publish { + topic: msg.topic, + message: response.into(), + tag: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG, + ttl_secs: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error? + + Ok(()) +} + +pub async fn notify_mark_notifications_as_read_rate_limit( + redis: &Arc, + topic: &Topic, + clock: &Clock, +) -> Result<(), RateLimitError> { + rate_limit::token_bucket( + redis, + format!("notify-mark-notifications-as-read-{topic}"), + 100, + chrono::Duration::milliseconds(500), + 1, + clock, + ) + .await +} diff --git a/src/services/public_http_server/handlers/relay_webhook/mod.rs b/src/services/public_http_server/handlers/relay_webhook/mod.rs index 484b0bb1..f6b7a01d 100644 --- a/src/services/public_http_server/handlers/relay_webhook/mod.rs +++ b/src/services/public_http_server/handlers/relay_webhook/mod.rs @@ -5,8 +5,8 @@ use { services::public_http_server::handlers::relay_webhook::{ error::RelayMessageError, handlers::{ - notify_delete, notify_get_notifications, notify_subscribe, notify_update, - notify_watch_subscriptions, + notify_delete, notify_get_notifications, notify_mark_notifications_as_read, + notify_subscribe, notify_update, notify_watch_subscriptions, }, }, spec, @@ -191,6 +191,9 @@ async fn handle_msg( notify_watch_subscriptions::handle(msg, state).await } spec::NOTIFY_GET_NOTIFICATIONS_TAG => notify_get_notifications::handle(msg, state).await, + spec::NOTIFY_MARK_NOTIFICATIONS_AS_READ_TAG => { + notify_mark_notifications_as_read::handle(msg, state).await + } _ => { warn!("Ignored tag {tag} on topic {topic}"); Ok(()) diff --git a/src/spec.rs b/src/spec.rs index 473c8ce6..20a2a338 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -10,6 +10,7 @@ pub const NOTIFY_MESSAGE_METHOD: &str = "wc_notifyMessage"; pub const NOTIFY_UPDATE_METHOD: &str = "wc_notifyUpdate"; pub const NOTIFY_DELETE_METHOD: &str = "wc_notifyDelete"; pub const NOTIFY_GET_NOTIFICATIONS_METHOD: &str = "wc_notifyGetNotifications"; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_METHOD: &str = "wc_notifyNotificationsChanged"; // Tags // https://specs.walletconnect.com/2.0/specs/clients/notify/rpc-methods @@ -27,8 +28,10 @@ pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TAG: u32 = 4012; pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG: u32 = 4013; pub const NOTIFY_GET_NOTIFICATIONS_TAG: u32 = 4014; pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG: u32 = 4015; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_TAG: u32 = 4016; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG: u32 = 4017; -pub const INCOMING_TAGS: [u32; 7] = [ +pub const INCOMING_TAGS: [u32; 8] = [ NOTIFY_SUBSCRIBE_TAG, NOTIFY_MESSAGE_RESPONSE_TAG, NOTIFY_DELETE_TAG, @@ -36,6 +39,7 @@ pub const INCOMING_TAGS: [u32; 7] = [ NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG, NOTIFY_GET_NOTIFICATIONS_TAG, + NOTIFY_MARK_NOTIFICATIONS_AS_READ_TAG, ]; // TTLs @@ -57,6 +61,8 @@ pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TTL: Duration = T300; pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL: Duration = T300; pub const NOTIFY_GET_NOTIFICATIONS_TTL: Duration = T300; pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TTL: Duration = T300; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_TTL: Duration = T300; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL: Duration = T300; // acts // https://specs.walletconnect.com/2.0/specs/clients/notify/notify-authentication @@ -75,6 +81,8 @@ pub const NOTIFY_DELETE_ACT: &str = "notify_delete"; pub const NOTIFY_DELETE_RESPONSE_ACT: &str = "notify_delete_response"; pub const NOTIFY_GET_NOTIFICATIONS_ACT: &str = "notify_get_notifications"; pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_ACT: &str = "notify_get_notifications_response"; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_ACT: &str = "notify_mark_notifications_as_read"; +pub const NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_ACT: &str = "notify_mark_notifications_as_read_response"; #[cfg(test)] mod tests {