Skip to content

Commit

Permalink
layout changes and batching
Browse files Browse the repository at this point in the history
  • Loading branch information
manglemix committed Jan 14, 2025
1 parent 34dfedb commit b242d39
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 261 deletions.
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ["usr-backend", "usr-backend-runner"]
members = ["usr-backend"]
9 changes: 0 additions & 9 deletions usr-backend-runner/Cargo.toml

This file was deleted.

26 changes: 0 additions & 26 deletions usr-backend-runner/src/main.rs

This file was deleted.

110 changes: 87 additions & 23 deletions usr-backend/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::
sync::Arc
use std::{collections::{hash_map::Entry, HashMap}, sync::{Arc, LazyLock}, time::Instant}
;

use axum::{
extract::State, http::StatusCode, response::{IntoResponse, Response}, routing::{delete, get, post}, Json, Router
};
use discord_webhook2::message::Message;
use parking_lot::Mutex;
use sea_orm::{
prelude::Decimal, sea_query::Table, sqlx::types::chrono::Local, ActiveModelTrait, ActiveValue, ConnectionTrait, DatabaseConnection, EntityTrait, Schema
};
Expand All @@ -15,6 +15,14 @@ use tracing::error;
use crate::{scheduler, UsrState};

mod order;
struct BatchedTask {
queue: HashMap<u32, String>,
deadline: Option<Instant>,
}
static BATCHED: LazyLock<Mutex<BatchedTask>> = LazyLock::new(|| Mutex::new(BatchedTask {
queue: HashMap::new(),
deadline: None,
}));

#[derive(Deserialize)]
pub struct PendingOrder {
Expand Down Expand Up @@ -57,20 +65,67 @@ async fn new_order(
vendor: ActiveValue::Set(pending_order.vendor),
link: ActiveValue::Set(pending_order.link),
};
if let Err(e) = active_model.insert(&state.db).await {
error!("Failed to create new order: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "")
} else {
tokio::spawn(async move {
if let Err(e) = state
.new_orders_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
match active_model.insert(&state.db).await {
Ok(m) => {
let mut guard = BATCHED.lock();
guard.queue.insert(m.id, webhook_msg);
let was_none = guard.deadline.is_none();
guard.deadline = Some(Instant::now() + std::time::Duration::from_secs(60 * 5));

if was_none {
drop(guard);

tokio::spawn(async move {
loop {
let deadline = BATCHED.lock().deadline.unwrap();
tokio::time::sleep_until(deadline.into()).await;
let queue;
{
let mut guard = BATCHED.lock();
if guard.deadline.unwrap() != deadline {
continue;
}
let replacement = HashMap::with_capacity(guard.queue.capacity());
queue = std::mem::replace(&mut guard.queue, replacement);
}
let mut running = String::new();
for (_, msg) in queue {
if running.len() + msg.len() + 1 < 2000 {
running.push_str(&msg);
running.push_str("\n");
} else {
if let Err(e) = state
.new_orders_webhook
.send(&Message::new(|message| message.content(running)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
running = msg;
}
}
if let Err(e) = state
.new_orders_webhook
.send(&Message::new(|message| message.content(running)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
let mut guard = BATCHED.lock();
if guard.queue.is_empty() {
guard.deadline = None;
break;
}
}
});
}
});
(StatusCode::OK, "")

(StatusCode::OK, "")
}
Err(e) => {
error!("Failed to create new order: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "")
}
}
}

Expand Down Expand Up @@ -134,15 +189,24 @@ async fn change_order(
error!("Failed to change order: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "")
} else {
tokio::spawn(async move {
if let Err(e) = state
.new_orders_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
let mut guard = BATCHED.lock();
match guard.queue.entry(change_order.id) {
Entry::Occupied(mut entry) => {
entry.insert(webhook_msg);
}
});
Entry::Vacant(_) => {
tokio::spawn(async move {
if let Err(e) = state
.new_orders_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
});
}
}

(StatusCode::OK, "")
}
}
Expand Down
Loading

0 comments on commit b242d39

Please sign in to comment.