From 0c2db967d802fad1f3b62c03dd9c189732f90d52 Mon Sep 17 00:00:00 2001 From: Chris Smith <1979423+chris13524@users.noreply.github.com> Date: Thu, 4 Jan 2024 22:11:29 -0600 Subject: [PATCH] chore: refactor update and delete tests (#255) * chore: refactor update * chore: refactor subscriptions changed * chore: refactor delete watch subscriptions changed * fix: flaky test * chore: refactor delete --- src/services/websocket_server/mod.rs | 8 +- src/spec.rs | 4 + src/types/mod.rs | 28 ++ tests/deployment.rs | 14 +- tests/integration.rs | 472 +++++++++++++-------------- 5 files changed, 271 insertions(+), 255 deletions(-) diff --git a/src/services/websocket_server/mod.rs b/src/services/websocket_server/mod.rs index 1a524bb7..6145bc8f 100644 --- a/src/services/websocket_server/mod.rs +++ b/src/services/websocket_server/mod.rs @@ -261,12 +261,12 @@ pub struct NotifySubscribe { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct NotifyUpdate { - update_auth: String, +pub struct NotifyUpdate { + pub update_auth: String, } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct NotifyDelete { - delete_auth: String, +pub struct NotifyDelete { + pub delete_auth: String, } diff --git a/src/spec.rs b/src/spec.rs index 7579d3da..90b69e4a 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -53,6 +53,10 @@ pub const NOTIFY_SUBSCRIPTIONS_CHANGED_ACT: &str = "notify_subscriptions_changed pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONE_ACT: &str = "notify_subscriptions_changed_response"; pub const NOTIFY_SUBSCRIBE_ACT: &str = "notify_subscription"; pub const NOTIFY_SUBSCRIBE_RESPONSE_ACT: &str = "notify_subscription_response"; +pub const NOTIFY_UPDATE_ACT: &str = "notify_update"; +pub const NOTIFY_UPDATE_RESPONSE_ACT: &str = "notify_update_response"; +pub const NOTIFY_DELETE_ACT: &str = "notify_delete"; +pub const NOTIFY_DELETE_RESPONSE_ACT: &str = "notify_delete_response"; #[cfg(test)] mod tests { diff --git a/src/types/mod.rs b/src/types/mod.rs index 2313fead..b8be864d 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -151,6 +151,14 @@ pub fn parse_scope(scope: &str) -> std::result::Result, uuid::Erro Ok(parsed_scope) } +pub fn encode_scope(notification_types: &HashSet) -> String { + notification_types + .iter() + .map(ToString::to_string) + .collect::>() + .join(" ") +} + #[cfg(test)] mod test { use super::*; @@ -178,4 +186,24 @@ mod test { HashSet::from([scope1, scope2]) ); } + + #[test] + fn encode_empty_scope() { + assert_eq!(encode_scope(&HashSet::new()), ""); + } + + #[test] + fn encode_one_scope() { + let scope1 = Uuid::new_v4(); + assert_eq!(encode_scope(&HashSet::from([scope1])), scope1.to_string()); + } + + #[test] + fn encode_two_scopes() { + let scope1 = Uuid::new_v4(); + let scope2 = Uuid::new_v4(); + let encoded = encode_scope(&HashSet::from([scope1, scope2])); + // need to check both orders because HashSet is non-deterministic + assert!(encoded == format!("{scope1} {scope2}") || encoded == format!("{scope2} {scope1}")); + } } diff --git a/tests/deployment.rs b/tests/deployment.rs index fc163e7d..5210c561 100644 --- a/tests/deployment.rs +++ b/tests/deployment.rs @@ -40,7 +40,7 @@ use { NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TTL, }, - types::{Envelope, EnvelopeType0, EnvelopeType1, Notification}, + types::{encode_scope, Envelope, EnvelopeType0, EnvelopeType1, Notification}, utils::topic_from_key, }, rand::{rngs::StdRng, SeedableRng}, @@ -493,11 +493,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { }, ksu: vars.keys_server_url.to_string(), sub: did_pkh.clone(), - scp: notification_types - .iter() - .map(ToString::to_string) - .collect::>() - .join(" "), + scp: encode_scope(¬ification_types), app: DidWeb::from_domain(app_domain.clone()), }; @@ -766,11 +762,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { }, ksu: vars.keys_server_url.to_string(), sub: did_pkh.clone(), - scp: notification_types - .iter() - .map(ToString::to_string) - .collect::>() - .join(" "), + scp: encode_scope(¬ification_types), app: DidWeb::from_domain(app_domain.clone()), }; diff --git a/tests/integration.rs b/tests/integration.rs index 70f99f52..559f4cd0 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -57,27 +57,27 @@ use { upsert_subscriber_notifications, NotificationToProcess, }, websocket_server::{ - decode_key, derive_key, relay_ws_client::RelayClientEvent, NotifyRequest, - NotifyResponse, NotifySubscribe, NotifySubscriptionsChanged, - NotifyWatchSubscriptions, ResponseAuth, + decode_key, derive_key, relay_ws_client::RelayClientEvent, NotifyDelete, + NotifyRequest, NotifyResponse, NotifySubscribe, NotifySubscriptionsChanged, + NotifyUpdate, NotifyWatchSubscriptions, ResponseAuth, }, }, spec::{ - NOTIFY_DELETE_METHOD, NOTIFY_DELETE_RESPONSE_TAG, NOTIFY_DELETE_TAG, NOTIFY_DELETE_TTL, - NOTIFY_MESSAGE_ACT, NOTIFY_MESSAGE_METHOD, NOTIFY_MESSAGE_RESPONSE_ACT, - NOTIFY_MESSAGE_RESPONSE_TAG, NOTIFY_MESSAGE_RESPONSE_TTL, NOTIFY_MESSAGE_TAG, - NOTIFY_NOOP_TAG, NOTIFY_SUBSCRIBE_ACT, NOTIFY_SUBSCRIBE_METHOD, - NOTIFY_SUBSCRIBE_RESPONSE_ACT, NOTIFY_SUBSCRIBE_RESPONSE_TAG, NOTIFY_SUBSCRIBE_TAG, - NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIPTIONS_CHANGED_ACT, + NOTIFY_DELETE_ACT, NOTIFY_DELETE_METHOD, NOTIFY_DELETE_RESPONSE_ACT, + NOTIFY_DELETE_RESPONSE_TAG, NOTIFY_DELETE_TAG, NOTIFY_DELETE_TTL, NOTIFY_MESSAGE_ACT, + NOTIFY_MESSAGE_METHOD, NOTIFY_MESSAGE_RESPONSE_ACT, NOTIFY_MESSAGE_RESPONSE_TAG, + NOTIFY_MESSAGE_RESPONSE_TTL, NOTIFY_MESSAGE_TAG, NOTIFY_NOOP_TAG, NOTIFY_SUBSCRIBE_ACT, + NOTIFY_SUBSCRIBE_METHOD, NOTIFY_SUBSCRIBE_RESPONSE_ACT, NOTIFY_SUBSCRIBE_RESPONSE_TAG, + NOTIFY_SUBSCRIBE_TAG, NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIPTIONS_CHANGED_ACT, NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONE_ACT, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL, - NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_METHOD, NOTIFY_UPDATE_RESPONSE_TAG, - NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, NOTIFY_WATCH_SUBSCRIPTIONS_ACT, - NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, - NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TAG, - NOTIFY_WATCH_SUBSCRIPTIONS_TTL, + NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_ACT, NOTIFY_UPDATE_METHOD, + NOTIFY_UPDATE_RESPONSE_ACT, NOTIFY_UPDATE_RESPONSE_TAG, NOTIFY_UPDATE_TAG, + NOTIFY_UPDATE_TTL, NOTIFY_WATCH_SUBSCRIPTIONS_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, + NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, + NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TTL, }, - types::{Envelope, EnvelopeType0, EnvelopeType1, Notification}, + types::{encode_scope, Envelope, EnvelopeType0, EnvelopeType1, Notification}, utils::topic_from_key, }, rand::rngs::StdRng, @@ -3014,6 +3014,174 @@ async fn accept_and_respond_to_notify_message( claims } +async fn publish_update_request<'a>( + relay_ws_client: &Client, + did_pkh: String, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'a>, + sym_key: &[u8; 32], + app: DidWeb, + notification_types: &HashSet, +) { + publish_jwt_message( + relay_ws_client, + client_id, + identity_key_details.clone(), + TopicEncrptionScheme::Symetric(sym_key), + NOTIFY_UPDATE_TAG, + NOTIFY_UPDATE_TTL, + NOTIFY_UPDATE_ACT, + |shared_claims| { + serde_json::to_value(NotifyRequest::new( + NOTIFY_UPDATE_METHOD, + NotifyUpdate { + update_auth: encode_auth( + &SubscriptionUpdateRequestAuth { + shared_claims, + ksu: identity_key_details.keys_server_url.to_string(), + sub: did_pkh.clone(), + app, + scp: encode_scope(notification_types), + }, + identity_key_details.signing_key, + ), + }, + )) + .unwrap() + }, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn update( + keys_server_url: Url, + identity_signing_key: &SigningKey, + identity_did_key: &str, + app: DidWeb, + app_client_id: &DecodedClientId, + did_pkh: &str, + notify_key: &[u8; 32], + notification_types: &HashSet, + relay_ws_client: &relay_client::websocket::Client, + rx: &mut UnboundedReceiver, +) { + publish_update_request( + relay_ws_client, + did_pkh.to_owned(), + app_client_id, + IdentityKeyDetails { + keys_server_url: &keys_server_url, + signing_key: identity_signing_key, + did_key: identity_did_key, + }, + notify_key, + app.clone(), + notification_types, + ) + .await; + + let response_topic = topic_from_key(notify_key); + let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let msg = accept_message(rx).await; + if msg.tag == NOTIFY_UPDATE_RESPONSE_TAG && msg.topic == response_topic { + return msg; + } + } + }) + .await + .unwrap(); + + let (_id, auth) = decode_response_message::(msg, notify_key); + assert_eq!(auth.shared_claims.act, NOTIFY_UPDATE_RESPONSE_ACT); + assert_eq!(auth.shared_claims.iss, app_client_id.to_did_key()); + assert_eq!(auth.shared_claims.aud, identity_did_key); + assert_eq!(auth.app, app); +} + +async fn publish_delete_request<'a>( + relay_ws_client: &Client, + did_pkh: String, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'a>, + sym_key: &[u8; 32], + app: DidWeb, +) { + publish_jwt_message( + relay_ws_client, + client_id, + identity_key_details.clone(), + TopicEncrptionScheme::Symetric(sym_key), + NOTIFY_DELETE_TAG, + NOTIFY_DELETE_TTL, + NOTIFY_DELETE_ACT, + |shared_claims| { + serde_json::to_value(NotifyRequest::new( + NOTIFY_DELETE_METHOD, + NotifyDelete { + delete_auth: encode_auth( + &SubscriptionDeleteRequestAuth { + shared_claims, + ksu: identity_key_details.keys_server_url.to_string(), + sub: did_pkh.clone(), + app, + }, + identity_key_details.signing_key, + ), + }, + )) + .unwrap() + }, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn delete( + keys_server_url: Url, + identity_signing_key: &SigningKey, + identity_did_key: &str, + app: DidWeb, + app_client_id: &DecodedClientId, + did_pkh: &str, + notify_key: &[u8; 32], + relay_ws_client: &relay_client::websocket::Client, + rx: &mut UnboundedReceiver, +) { + publish_delete_request( + relay_ws_client, + did_pkh.to_owned(), + app_client_id, + IdentityKeyDetails { + keys_server_url: &keys_server_url, + signing_key: identity_signing_key, + did_key: identity_did_key, + }, + notify_key, + app.clone(), + ) + .await; + + let response_topic = topic_from_key(notify_key); + let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let msg = accept_message(rx).await; + if msg.tag == NOTIFY_DELETE_RESPONSE_TAG && msg.topic == response_topic { + return msg; + } + } + }) + .await + .unwrap(); + + let (_id, auth) = decode_response_message::(msg, notify_key); + assert_eq!(auth.shared_claims.act, NOTIFY_DELETE_RESPONSE_ACT); + assert_eq!(auth.shared_claims.iss, app_client_id.to_did_key()); + assert_eq!(auth.shared_claims.aud, identity_did_key); + assert_eq!(auth.app, app); +} + fn generate_identity_key() -> (SigningKey, DecodedClientId) { let keypair = Keypair::generate(&mut StdRng::from_entropy()); let signing_key = SigningKey::from_bytes(keypair.secret_key().as_bytes()); @@ -3326,238 +3494,62 @@ async fn run_test( assert_eq!(claims.msg.icon, "icon"); assert_eq!(claims.msg.url, "url"); - // Update subscription - - // Prepare update auth for *wallet* client - // https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/clients/notify/notify-authentication.md#notify-updatelet notification_type = Uuid::new_v4(); let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type, Uuid::new_v4(), Uuid::new_v4()]); - let now = Utc::now(); - let update_auth = SubscriptionUpdateRequestAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_UPDATE_TTL).timestamp() as u64, - iss: identity_did_key.clone(), - act: "notify_update".to_owned(), - aud: client_id.to_did_key(), - mjv: "0".to_owned(), - }, - ksu: keys_server_url.to_string(), - sub: did_pkh.clone(), - scp: notification_types - .iter() - .map(ToString::to_string) - .collect::>() - .join(" "), - app: DidWeb::from_domain(app_domain.clone()), - }; - - // Encode the subscription auth - let update_auth = encode_auth(&update_auth, &identity_signing_key); - - let sub_auth = json!({ "updateAuth": update_auth }); - - let delete_message = NotifyRequest::new(NOTIFY_UPDATE_METHOD, sub_auth); - - let envelope = Envelope::::new(¬ify_key, delete_message).unwrap(); - - let encoded_message = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - relay_ws_client - .publish( - notify_topic.clone(), - encoded_message, - NOTIFY_UPDATE_TAG, - NOTIFY_UPDATE_TTL, - false, - ) - .await - .unwrap(); - - // Check for update response - let resp = rx.recv().await.unwrap(); - - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_UPDATE_RESPONSE_TAG); - - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), + update( + keys_server_url.clone(), + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain.clone()), + &client_id, + &did_pkh, + ¬ify_key, + ¬ification_types, + &relay_ws_client, + &mut rx, ) - .unwrap(); - - let cipher = ChaCha20Poly1305::new(GenericArray::from_slice(¬ify_key)); - let decrypted_response = cipher - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - - let response: NotifyResponse = - serde_json::from_slice(&decrypted_response).unwrap(); - - let response_auth = response - .result - .get("responseAuth") // TODO use structure - .unwrap() - .as_str() - .unwrap(); - let claims = from_jwt::(response_auth).unwrap(); - // https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/clients/notify/notify-authentication.md#notify-update-response - // TODO verify issuer - assert_eq!(claims.sub, did_pkh); - assert!((claims.shared_claims.iat as i64) < chrono::Utc::now().timestamp() + JWT_LEEWAY); // TODO remove leeway - assert!((claims.shared_claims.exp as i64) > chrono::Utc::now().timestamp() - JWT_LEEWAY); // TODO remove leeway - assert_eq!(claims.app, DidWeb::from_domain(app_domain.clone())); - assert_eq!(claims.shared_claims.aud, identity_did_key); - assert_eq!(claims.shared_claims.act, "notify_update_response"); - - { - let resp = rx.recv().await.unwrap(); - - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG); - - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), - ) - .unwrap(); - - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(&watch_topic_key)) - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - - let response: NotifyRequest = - serde_json::from_slice(&decrypted_response).unwrap(); - - let response_auth = response - .params - .get("subscriptionsChangedAuth") // TODO use structure - .unwrap() - .as_str() - .unwrap(); - let auth = from_jwt::(response_auth).unwrap(); - assert_eq!(auth.shared_claims.act, "notify_subscriptions_changed"); - assert_eq!(auth.sbs.len(), 1); - let subs = &auth.sbs[0]; - assert_eq!(subs.scope, notification_types); - } - - // Prepare deletion auth for *wallet* client - // https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/clients/notify/notify-authentication.md#notify-delete - let now = Utc::now(); - let delete_auth = SubscriptionDeleteRequestAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_DELETE_TTL).timestamp() as u64, - iss: identity_did_key.clone(), - aud: client_id.to_did_key(), - act: "notify_delete".to_owned(), - mjv: "0".to_owned(), - }, - ksu: keys_server_url.to_string(), - sub: did_pkh.clone(), - app: DidWeb::from_domain(app_domain.clone()), - }; - - // Encode the subscription auth - let delete_auth = encode_auth(&delete_auth, &identity_signing_key); - - let sub_auth = json!({ "deleteAuth": delete_auth }); - - let delete_message = NotifyRequest::new(NOTIFY_DELETE_METHOD, sub_auth); - - let envelope = Envelope::::new(¬ify_key, delete_message).unwrap(); - - let encoded_message = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - relay_ws_client - .publish( - notify_topic, - encoded_message, - NOTIFY_DELETE_TAG, - NOTIFY_DELETE_TTL, - false, - ) - .await - .unwrap(); - - // Check for delete response - let resp = rx.recv().await.unwrap(); - - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_DELETE_RESPONSE_TAG); + .await; - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), + let sbs = accept_watch_subscriptions_changed( + keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, ) - .unwrap(); - - let cipher = ChaCha20Poly1305::new(GenericArray::from_slice(¬ify_key)); - let decrypted_response = cipher - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - - let response: NotifyResponse = - serde_json::from_slice(&decrypted_response).unwrap(); - - let response_auth = response - .result - .get("responseAuth") // TODO use structure - .unwrap() - .as_str() - .unwrap(); - let claims = from_jwt::(response_auth).unwrap(); - // https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/clients/notify/notify-authentication.md#notify-delete-response - // TODO verify issuer - assert_eq!(claims.sub, did_pkh); - assert!((claims.shared_claims.iat as i64) < chrono::Utc::now().timestamp() + JWT_LEEWAY); // TODO remove leeway - assert!((claims.shared_claims.exp as i64) > chrono::Utc::now().timestamp() - JWT_LEEWAY); // TODO remove leeway - assert_eq!(claims.app, DidWeb::from_domain(app_domain)); - assert_eq!(claims.shared_claims.aud, identity_did_key); - assert_eq!(claims.shared_claims.act, "notify_delete_response"); - - { - let resp = rx.recv().await.unwrap(); - - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG); - - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), - ) - .unwrap(); - - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(&watch_topic_key)) - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); + .await; + assert_eq!(sbs.len(), 1); + let sub = &sbs[0]; + assert_eq!(sub.scope, notification_types); - let response: NotifyRequest = - serde_json::from_slice(&decrypted_response).unwrap(); + delete( + keys_server_url.clone(), + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain.clone()), + &client_id, + &did_pkh, + ¬ify_key, + &relay_ws_client, + &mut rx, + ) + .await; - let response_auth = response - .params - .get("subscriptionsChangedAuth") // TODO use structure - .unwrap() - .as_str() - .unwrap(); - let auth = from_jwt::(response_auth).unwrap(); - assert_eq!(auth.shared_claims.act, "notify_subscriptions_changed"); - assert!(auth.sbs.is_empty()); - } + let sbs = accept_watch_subscriptions_changed( + keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ) + .await; + assert!(sbs.is_empty()); let resp = assert_successful_response( reqwest::Client::new()