Skip to content

Commit

Permalink
backup and batching
Browse files Browse the repository at this point in the history
  • Loading branch information
manglemix committed Jan 19, 2025
1 parent c5eff17 commit 2446c22
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 138 deletions.
42 changes: 42 additions & 0 deletions usr-backend/src/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::{process::Command, sync::atomic::Ordering, time::Duration};

use crate::UsrState;

pub fn backup_db(state: &'static UsrState) {
if state.backup_task_running.swap(true, Ordering::Relaxed) {
return;
}
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(60 * 10)).await;
state.backup_task_running.store(false, Ordering::Relaxed);
if let Err(e) = std::fs::copy("usr-db.sqlite", "../usr-db-backup/usr-db.sqlite") {
tracing::error!("Failed to copy database: {}", e);
return;
}
if let Err(e) = Command::new("git")
.arg("add")
.arg("usr-db.sqlite")
.current_dir("../usr-db-backup")
.output()
{
tracing::error!("Failed to add files to git: {}", e);
}
if let Err(e) = Command::new("git")
.arg("commit")
.arg("-m")
.arg("Automated backup")
.current_dir("../usr-db-backup")
.output()
{
tracing::error!("Failed to commit files to git: {}", e);
}
if let Err(e) = Command::new("git")
.arg("push")
.current_dir("../usr-db-backup")
.output()
{
tracing::error!("Failed to push files to git: {}", e);
}
});

}
19 changes: 12 additions & 7 deletions usr-backend/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
backtrace::Backtrace, io::{LineWriter, Write}, net::SocketAddr, panic::set_hook, path::Path, sync::Arc
backtrace::Backtrace, io::{LineWriter, Write}, net::SocketAddr, panic::set_hook, path::Path, sync::atomic::AtomicBool
};

use axum::{routing::get, Router};
Expand All @@ -12,9 +12,12 @@ use tower::ServiceBuilder;
use tower_http::cors::Any;
use tracing::{error, info};
use tracing_subscriber::FmtSubscriber;
use webhook::BatchedWebhook;

mod scheduler;
mod manifest;
mod webhook;
mod backup;

struct LogWriter {
inner: &'static Mutex<LineWriter<std::fs::File>>,
Expand Down Expand Up @@ -42,8 +45,9 @@ struct Config {

struct UsrState {
db: DatabaseConnection,
new_orders_webhook: Option<DiscordWebhook>,
order_updates_webhook: Option<DiscordWebhook>,
new_orders_webhook: Option<BatchedWebhook>,
order_updates_webhook: Option<BatchedWebhook>,
backup_task_running: AtomicBool
}

#[tokio::main]
Expand Down Expand Up @@ -140,23 +144,24 @@ async fn main() -> anyhow::Result<()> {
})
.layer(tower_http::compression::CompressionLayer::new())
)
.with_state(Arc::new(UsrState {
.with_state(Box::leak(Box::new(UsrState {
db,
new_orders_webhook: {
if let Some(new_orders_webhook) = config.new_orders_webhook {
Some(DiscordWebhook::new(new_orders_webhook)?)
Some(DiscordWebhook::new(new_orders_webhook)?.into())
} else {
None
}
},
order_updates_webhook: {
if let Some(order_updates_webhook) = config.order_updates_webhook {
Some(DiscordWebhook::new(order_updates_webhook)?)
Some(DiscordWebhook::new(order_updates_webhook)?.into())
} else {
None
}
},
}));
backup_task_running: AtomicBool::new(false),
})));

default_provider()
.install_default()
Expand Down
141 changes: 32 additions & 109 deletions usr-backend/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
use std::{collections::{hash_map::Entry, HashMap}, sync::{Arc, LazyLock}, time::Instant}
;

use axum::{
extract::State, http::StatusCode, response::{IntoResponse, Response}, routing::{delete, get, post}, Json, Router
};
use discord_webhook2::message::Message;
use parking_lot::Mutex;
use sea_orm::{
prelude::Decimal, sea_query::Table, sqlx::types::chrono::Local, ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, Schema, TransactionTrait
};
use serde::Deserialize;
use tracing::error;

use crate::{scheduler, UsrState};
use crate::{backup::backup_db, scheduler, UsrState};

mod order;
mod order_status;

struct BatchedTask {
queue: HashMap<u32, String>,
deadline: Option<Instant>,
}
static BATCHED: LazyLock<Mutex<BatchedTask>> = LazyLock::new(|| Mutex::new(BatchedTask {
queue: HashMap::new(),
deadline: None,
}));

#[derive(Deserialize)]
pub struct PendingOrder {
pub name: String,
Expand All @@ -40,7 +26,7 @@ pub struct PendingOrder {

#[axum::debug_handler]
async fn new_order(
State(state): State<Arc<UsrState>>,
State(state): State<&'static UsrState>,
Json(pending_order): Json<PendingOrder>,
) -> (StatusCode, &'static str) {
let webhook_msg = format!(
Expand Down Expand Up @@ -82,59 +68,8 @@ async fn new_order(

match result {
Ok(m) => {
let mut guard = BATCHED.lock();
guard.queue.insert(m.id, webhook_msg);
let was_none = guard.deadline.is_none();
guard.deadline = Some(Instant::now() + std::time::Duration::from_secs(60 * 5));

if was_none {
drop(guard);
if state.new_orders_webhook.is_some() {
tokio::spawn(async move {
let new_orders_webhook = state.new_orders_webhook.as_ref().unwrap();
loop {
let deadline = BATCHED.lock().deadline.unwrap();
tokio::time::sleep_until(deadline.into()).await;
let queue;
{
let mut guard = BATCHED.lock();
if guard.deadline.unwrap() != deadline {
continue;
}
let replacement = HashMap::with_capacity(guard.queue.capacity());
queue = std::mem::replace(&mut guard.queue, replacement);
}
let mut running = String::new();
for (_, msg) in queue {
if running.len() + msg.len() + 1 < 2000 {
running.push_str(&msg);
running.push_str("\n");
} else {
if let Err(e) = new_orders_webhook
.send(&Message::new(|message| message.content(running)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
running = msg;
}
}
if let Err(e) = new_orders_webhook
.send(&Message::new(|message| message.content(running)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
let mut guard = BATCHED.lock();
if guard.queue.is_empty() {
guard.deadline = None;
break;
}
}
});
}
}

backup_db(state);
state.new_orders_webhook.as_ref().map(|x| x.enqueue(m.id, webhook_msg));
(StatusCode::OK, "")
}
Err(e) => {
Expand All @@ -159,7 +94,7 @@ pub struct ChangeOrder {

#[axum::debug_handler]
async fn change_order(
State(state): State<Arc<UsrState>>,
State(state): State<&'static UsrState>,
Json(change_order): Json<ChangeOrder>,
) -> (StatusCode, &'static str) {
match order_status::Entity::find().filter(order_status::Column::OrderId.eq(change_order.id)).order_by_desc(order_status::Column::InstanceId).one(&state.db).await {
Expand Down Expand Up @@ -202,23 +137,25 @@ async fn change_order(
error!("Failed to change order: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "")
} else {
let mut guard = BATCHED.lock();
match guard.queue.entry(change_order.id) {
Entry::Occupied(mut entry) => {
entry.insert(webhook_msg);
}
Entry::Vacant(_) => {
tokio::spawn(async move {
let Some(new_orders_webhook) = state.new_orders_webhook.as_ref() else { return; };
if let Err(e) = new_orders_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
});
}
}
backup_db(state);
state.new_orders_webhook.as_ref().map(|x| x.enqueue(change_order.id, webhook_msg));
// let mut guard = BATCHED.lock();
// match guard.queue.entry(change_order.id) {
// Entry::Occupied(mut entry) => {
// entry.insert(webhook_msg);
// }
// Entry::Vacant(_) => {
// tokio::spawn(async move {
// let Some(new_orders_webhook) = state.new_orders_webhook.as_ref() else { return; };
// if let Err(e) = new_orders_webhook
// .send(&Message::new(|message| message.content(webhook_msg)))
// .await
// {
// error!("Failed to trigger new-order webhook: {e}");
// }
// });
// }
// }

(StatusCode::OK, "")
}
Expand All @@ -233,7 +170,7 @@ struct DeleteOrder {

#[axum::debug_handler]
async fn cancel_order(
State(state): State<Arc<UsrState>>,
State(state): State<&'static UsrState>,
Json(DeleteOrder { id, force }): Json<DeleteOrder>,
) -> (StatusCode, &'static str) {
let webhook_msg;
Expand Down Expand Up @@ -284,15 +221,8 @@ async fn cancel_order(
return (StatusCode::INTERNAL_SERVER_ERROR, "");
}

tokio::spawn(async move {
let Some(new_orders_webhook) = state.new_orders_webhook.as_ref() else { return; };
if let Err(e) = new_orders_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger new-order webhook: {e}");
}
});
state.new_orders_webhook.as_ref().map(|x| x.enqueue(id, webhook_msg));
backup_db(state);

(StatusCode::OK, "")
}
Expand All @@ -305,7 +235,7 @@ pub struct UpdateOrder {

#[axum::debug_handler]
async fn update_order(
State(state): State<Arc<UsrState>>,
State(state): State<&'static UsrState>,
Json(update_order): Json<UpdateOrder>,
) -> (StatusCode, &'static str) {
let webhook_msg;
Expand Down Expand Up @@ -369,22 +299,15 @@ async fn update_order(
error!("Failed to update order status: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "")
} else {
tokio::spawn(async move {
let Some(order_updates_webhook) = state.order_updates_webhook.as_ref() else { return; };
if let Err(e) = order_updates_webhook
.send(&Message::new(|message| message.content(webhook_msg)))
.await
{
error!("Failed to trigger order-updates webhook: {e}");
}
});
state.order_updates_webhook.as_ref().map(|x| x.enqueue(update_order.id, webhook_msg));
backup_db(state);
(StatusCode::OK, "")
}
}

#[axum::debug_handler]
async fn get_orders(
State(state): State<Arc<UsrState>>,
State(state): State<&'static UsrState>,
) -> Response {
let result = order::Entity::find().all(&state.db).await;

Expand Down Expand Up @@ -412,7 +335,7 @@ async fn get_orders(
}
}

pub fn router() -> Router<Arc<UsrState>> {
pub fn router() -> Router<&'static UsrState> {
Router::new()
.route("/new/order", post(new_order))
.route("/change/order", post(change_order))
Expand Down
Loading

0 comments on commit 2446c22

Please sign in to comment.