Skip to content

Commit

Permalink
Fix viewer count and clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
Jokler committed Jan 21, 2024
1 parent 9c242d9 commit b86257c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"qw-site",
"qw-ingest",
]
resolver = "2"

[patch.crates-io]
av-data = { git = "https://github.com/rust-av/rust-av" }
Expand Down
2 changes: 1 addition & 1 deletion libs/qw-site-doc-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn generate_help_item(
) -> anyhow::Result<()> {
match item {
TreeItem::Item { path, src_file, .. } => {
dest_path.push(path.to_owned());
dest_path.push(path);
src.push(src_file);

let content = read_md_to_html(&src)?;
Expand Down
7 changes: 2 additions & 5 deletions libs/sh-ingest-rtmp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn wait_for_metadata(

loop {
let bytes = read_filter.read().await?;
for res in rtmp_server_session.handle_input(&bytes).map_err(|e| e)? {
for res in rtmp_server_session.handle_input(&bytes)? {
match res {
ServerSessionResult::OutboundResponse(pkt) => rtmp_tx.send(pkt).await?,
ServerSessionResult::RaisedEvent(ServerSessionEvent::StreamMetadataChanged {
Expand Down Expand Up @@ -448,10 +448,7 @@ impl FrameReadFilter for RtmpReadFilter {

let streams = [self.video_stream.clone(), self.audio_stream.clone()];

Ok(std::array::IntoIter::new(streams)
.into_iter()
.flatten()
.collect())
Ok(streams.into_iter().flatten().collect())
}

async fn read(&mut self) -> anyhow::Result<Frame> {
Expand Down
4 changes: 3 additions & 1 deletion libs/sh-media/src/media_frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ impl FrameReadFilter for MediaFrameQueueReceiver {
// FIXME: on buffer overflow (channel closed), raise an error to the
// parent filter graph

let frame = self.recv.recv()
let frame = self
.recv
.recv()
.await
.context("failed to read frame from queue")?;

Expand Down
7 changes: 3 additions & 4 deletions libs/sh-transport-mse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ pub async fn start_websocket_filters(
let first_frame = wait_for_sync_frame(read)
.await
.context("waiting for first sync frame")?;
write.start(streams)
.await
.context("starting to write")?;
write.write(first_frame)
write.start(streams).await.context("starting to write")?;
write
.write(first_frame)
.await
.context("writing first frame")?;

Expand Down
37 changes: 20 additions & 17 deletions qw-site/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ async fn stream_page(
};

Ok(AskamaTemplate(&template).into_response())
},
}
Some(StreamStatus::Offline(name)) => {
let template = StreamOfflineTemplate {
name: &name,
};
let template = StreamOfflineTemplate { name: &name };

Ok(AskamaTemplate(&template).into_response())
},
}
None => {
let mut response = AskamaTemplate(&Stream404Template).into_response();
*response.status_mut() = StatusCode::NOT_FOUND;
Expand All @@ -130,27 +128,32 @@ enum StreamStatus {
Offline(String),
}

async fn get_stream_status(data: &Arc<AppData>, stream: &str) -> anyhow::Result<Option<StreamStatus>> {
async fn get_stream_status(
data: &Arc<AppData>,
stream: &str,
) -> anyhow::Result<Option<StreamStatus>> {
let conn = data.pool.get().await?;

let row = conn.query_opt(
"
let row = conn
.query_opt(
"
SELECT id, name FROM account
WHERE account.name ILIKE $1",
&[
&stream
]).await?;
&[&stream],
)
.await?;

let id_and_name: Option<(i32, String)> = row.map(|r| (r.get(0), r.get(1)));

if let Some((id, name)) = id_and_name {
let row = conn.query_opt(
"
let row = conn
.query_opt(
"
SELECT id FROM stream_session
WHERE account_id = $1 AND stop_time IS NULL",
&[
&id
]).await?;
&[&id],
)
.await?;

if row.is_some() {
Ok(Some(StreamStatus::Online(name)))
Expand Down Expand Up @@ -241,7 +244,7 @@ async fn start() -> anyhow::Result<()> {
let scuffed_addr = resolve_env_addr("QW_WEB_ADDR", "localhost:9083");

let manager = bb8_postgres::PostgresConnectionManager::new_from_stringlike(
&env::var("DATABASE_URL").context("DATABASE_URL not set")?,
env::var("DATABASE_URL").context("DATABASE_URL not set")?,
tokio_postgres::NoTls,
)
.context("failed to create database connection")?;
Expand Down
27 changes: 25 additions & 2 deletions qw-site/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,25 @@ ON CONFLICT DO NOTHING
Ok(())
}

pub async fn update_viewer_count(
conn: &PostgresConnection<'_>,
stream_session_id: i32,
count: u32,
) -> anyhow::Result<()> {
let _ = conn
.execute(
"
UPDATE stream_session
SET viewer_count=$1
WHERE id=$2
",
&[&count, &stream_session_id],
)
.await?;

Ok(())
}

pub async fn get_stream_sessions(
conn: &PostgresConnection<'_>,
account: i32,
Expand Down Expand Up @@ -333,13 +352,17 @@ async fn handle_msg(
}
}
StreamType::ViewerJoin(msg) => {
if let Some(mut stream) = streams.get_mut(&msg.stream_session_id) {
if let Some(stream) = streams.get_mut(&msg.stream_session_id) {
stream.viewers += 1;
let conn = pool.get().await?;
update_viewer_count(&conn, msg.stream_session_id, stream.viewers).await?;
}
}
StreamType::ViewerLeave(msg) => {
if let Some(mut stream) = streams.get_mut(&msg.stream_session_id) {
if let Some(stream) = streams.get_mut(&msg.stream_session_id) {
stream.viewers -= 1;
let conn = pool.get().await?;
update_viewer_count(&conn, msg.stream_session_id, stream.viewers).await?;
}
}
StreamType::StreamStopped(stream) => {
Expand Down

0 comments on commit b86257c

Please sign in to comment.