Skip to content

Commit

Permalink
feat(listener): change the real-time event handling interface
Browse files Browse the repository at this point in the history
feat(presence-state): state maintained after set state is used

`user_id` state for specified channels will be maintained by the SDK. State with subscribe calls
has been improved.

feat(api): adding first-class citizens to access subscription

Adding `Channel`, `ChannelGroup`, `ChannelMetadata` and `UuidMetadata` entities to be first-class
citizens to access APIs related to them. Currently, access is provided only for subscription APIs.

feat(auto-retry): added ability to exclude endpoints from retry

Added ability to configure request retry policies to exclude specific endpoints from retry.
  • Loading branch information
parfeon committed Jan 19, 2024
1 parent 18690e4 commit 3e6c97f
Show file tree
Hide file tree
Showing 83 changed files with 8,585 additions and 2,253 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std = ["derive_builder/std", "log/std", "uuid/std", "base64/std", "spin/std", "s
extra_platforms = ["spin/portable_atomic", "dep:portable-atomic"]

# [Internal features] (not intended for use outside of the library)
contract_test = ["parse_token", "publish", "access", "crypto"]
contract_test = ["parse_token", "publish", "access", "crypto", "std", "subscribe", "presence", "tokio"]
full_no_std = ["serde", "reqwest", "crypto", "parse_token", "blocking", "publish", "access", "subscribe", "tokio", "presence"]
full_no_std_platform_independent = ["serde", "crypto", "parse_token", "blocking", "publish", "access", "subscribe", "presence"]
pubnub_only = ["crypto", "parse_token", "blocking", "publish", "access", "subscribe", "presence"]
Expand Down Expand Up @@ -106,13 +106,14 @@ getrandom = { version = "0.2", optional = true }
# parse_token
ciborium = { version = "0.2.1", default-features = false, optional = true }

# subscribe
# subscribe, presence
futures = { version = "0.3.28", default-features = false, optional = true }
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "macros", "time"] }
async-channel = { version = "1.8", optional = true }

# extra_platforms
portable-atomic = { version = "1.3", optional = true, default-features = false, features = ["require-cas", "critical-section"] }
backtrace = "0.3.69"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand Down Expand Up @@ -165,7 +166,7 @@ required-features = ["default"]

[[example]]
name = "subscribe"
required-features = ["default", "subscribe"]
required-features = ["default", "subscribe", "presence"]

[[example]]
name = "subscribe_raw"
Expand Down
41 changes: 36 additions & 5 deletions examples/presence_state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::collections::HashMap;

use pubnub::{Keyset, PubNubClientBuilder};
use serde::Serialize;
use std::env;

#[derive(Debug, Serialize)]
#[derive(Debug, serde::Serialize)]
struct State {
is_doing: String,
flag: bool,
}
#[derive(Debug, serde::Serialize)]
struct State2 {
is_doing: String,
business: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;
// let publish_key = env::var("SDK_PUB_KEY")?;
// let subscribe_key = env::var("SDK_SUB_KEY")?;
let publish_key = "demo";
let subscribe_key = "demo";

let client = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
Expand All @@ -23,9 +31,32 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

println!("running!");

client
.set_presence_state_with_heartbeat(HashMap::from([
(
"my_channel".to_string(),
State {
is_doing: "Something".to_string(),
flag: true,
},
),
(
"other_channel".to_string(),
State {
is_doing: "Oh no".to_string(),
flag: false,
},
),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
.execute()
.await?;

client
.set_presence_state(State {
is_doing: "Nothing... Just hanging around...".into(),
flag: false,
})
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
Expand Down
139 changes: 104 additions & 35 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use futures::StreamExt;
use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
use pubnub::{Keyset, PubNubClientBuilder};
use std::collections::HashMap;

use futures::{FutureExt, StreamExt};
use serde::Deserialize;
use std::env;

use pubnub::subscribe::SubscriptionOptions;
use pubnub::{
dx::subscribe::Update,
subscribe::{EventEmitter, EventSubscriber},
Keyset, PubNubClientBuilder,
};

#[derive(Debug, Deserialize)]
struct Message {
// Allowing dead code because we don't use these fields
Expand All @@ -16,8 +23,8 @@ struct Message {

#[tokio::main]
async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;
let publish_key = "demo"; //env::var("SDK_PUB_KEY")?;
let subscribe_key = "demo"; //env::var("SDK_SUB_KEY")?;

let client = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
Expand All @@ -26,60 +33,122 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
secret_key: None,
})
.with_user_id("user_id")
.with_filter_expression("some_filter")
.with_heartbeat_value(100)
.with_heartbeat_interval(5)
.build()?;

println!("running!");

let subscription = client
.subscribe()
client
.set_presence_state(HashMap::<String, String>::from([
(
"is_doing".to_string(),
"Nothing... Just hanging around...".to_string(),
),
("flag".to_string(), "false".to_string()),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.heartbeat(10)
.filter_expression("some_filter")
.execute()?;
.user_id("user_id")
.execute()
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let subscription = client.subscription(
Some(&["my_channel", "other_channel"]),
None,
Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
);
subscription.subscribe(None);
let subscription_clone = subscription.clone_empty();

// Attach connection status to the PubNub client instance.
tokio::spawn(
client
.status_stream()
.for_each(|status| async move { println!("\nstatus: {:?}", status) }),
);

tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
SubscribeStreamEvent::Update(update) => {
println!("\nupdate: {:?}", update);
match update {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("presence: {:?}", presence)
}
Update::Object(object) => {
println!("object: {:?}", object)
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
}
Update::MessageAction(action) => {
println!("message action: {:?}", action)
}
Update::File(file) => {
println!("file: {:?}", file)
}
}
Update::Presence(presence) => {
println!("presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("object: {:?}", object)
}
Update::MessageAction(action) => {
println!("message action: {:?}", action)
}
Update::File(file) => {
println!("file: {:?}", file)
}
}
}));

tokio::spawn(subscription_clone.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("~~~~~> defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
}
}
}
SubscribeStreamEvent::Status(status) => println!("\nstatus: {:?}", status),
Update::Presence(presence) => {
println!("~~~~~> presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("~~~~~> object: {:?}", object)
}
Update::MessageAction(action) => {
println!("~~~~~> message action: {:?}", action)
}
Update::File(file) => {
println!("~~~~~> file: {:?}", file)
}
}
}));

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;

// You can also cancel the subscription at any time.
subscription.unsubscribe().await;
// subscription.unsubscribe();

println!("~~~~~~~~> DISCONNECT");
client.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("~~~~~~~~> RECONNECT");
client.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("~~~~~~~~> UNSUBSCRIBE ALL...");

// Clean up before complete work with PubNub client instance.
client.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

println!("~~~~~~~~> UNSUBSCRIBE ALL. DONE");

Ok(())
}
2 changes: 1 addition & 1 deletion examples/subscribe_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
Update::Presence(presence) => {
println!("presence: {:?}", presence)
}
Update::Object(object) => {
Update::AppContext(object) => {
println!("object: {:?}", object)
}
Update::MessageAction(action) => {
Expand Down
2 changes: 1 addition & 1 deletion examples/subscribe_raw_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn main() -> Result<(), Box<dyn snafu::Error>> {
Update::Presence(presence) => {
println!("presence: {:?}", presence)
}
Update::Object(object) => {
Update::AppContext(object) => {
println!("object: {:?}", object)
}
Update::MessageAction(action) => {
Expand Down
Loading

0 comments on commit 3e6c97f

Please sign in to comment.