From b20560ad8678f03ec4d5c8bb72c56fc873731673 Mon Sep 17 00:00:00 2001 From: Amninder Kaur <36900343+akaur13@users.noreply.github.com> Date: Tue, 12 Sep 2023 09:06:46 +1000 Subject: [PATCH] Feature/adding config (#4) * chore: changes for transactions * chore: add Monitor config --------- Co-authored-by: Amninder Kaur --- .env | 11 +- .env.example | 6 +- How-to.md | 2 +- chronos_bin/src/message_processor.rs | 149 +++++++++----- chronos_bin/src/monitor.rs | 11 +- chronos_bin/src/postgres/pg.rs | 282 ++++++++++++++++++--------- chronos_bin/src/utils/config.rs | 13 +- docker-compose.yml | 2 +- 8 files changed, 317 insertions(+), 159 deletions(-) diff --git a/.env b/.env index 4462464..713ff15 100644 --- a/.env +++ b/.env @@ -24,13 +24,14 @@ PG_PASSWORD=admin PG_DATABASE=chronos PG_POOL_SIZE=25 - # CONFIG RUST_LOG=info - +#APP DELAY_TIME=0 RANDOMNESS_DELAY=100 -DB_POLL_INTERVAL=5 -TIMING_ADVANCE=10 -FAIL_DETECT_INTERVAL=10 +MONITOR_DB_POLL=5 +TIMING_ADVANCE=0 +FAIL_DETECT_INTERVAL=500 +MAX_RETRIES=3 +PROCESSOR_DB_POLL=10 diff --git a/.env.example b/.env.example index 29716af..1c5f647 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,8 @@ RUST_LOG=debug #APP DELAY_TIME=0 RANDOMNESS_DELAY=100 -DB_POLL_INTERVAL=5 +MONITOR_DB_POLL=5 TIMING_ADVANCE=0 -FAIL_DETECT_INTERVAL=500 \ No newline at end of file +FAIL_DETECT_INTERVAL=500 +MAX_RETRIES=3 +PROCESSOR_DB_POLL=10 \ No newline at end of file diff --git a/How-to.md b/How-to.md index 64cb0cd..5b1bc51 100644 --- a/How-to.md +++ b/How-to.md @@ -39,7 +39,7 @@ All the required configurations for Chronos can be passed in environment variabl | PG_POOL_SIZE|50|True | DELAY_TIME|0|False | RANDOMNESS_DELAY|100|False -| DB_POLL_INTERVAL|5|False +| MONITOR_DB_POLL|5|False | TIMING_ADVANCE|0|False | FAIL_DETECT_INTERVAL|500|False diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index 94f963e..7fcebfb 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -1,5 +1,7 @@ use crate::kafka::producer::KafkaProducer; use crate::postgres::pg::{GetReady, Pg, TableRow}; +use crate::utils::config::ChronosConfig; +use crate::utils::delay_controller::DelayController; use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; @@ -13,80 +15,121 @@ pub struct MessageProcessor { impl MessageProcessor { pub async fn run(&self) { - println!("MessageProcessor ON!"); + log::info!("MessageProcessor ON!"); + //Get UUID for the node that deployed this thread + let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string()); + // log::info!("node_id {}", node_id); + let mut delay_controller = DelayController::new(100); loop { tokio::time::sleep(Duration::from_millis(10)).await; - // println!("MessageProcessor"); - let deadline = Utc::now(); - let uuid = Uuid::new_v4(); + // tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await; + let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance); let param = GetReady { readied_at: deadline, - readied_by: uuid, + readied_by: Uuid::parse_str(&node_id).unwrap(), deadline, - limit: 1000, + // limit: 1000, // order: "asc", }; - let mut ready_params = Vec::new(); - ready_params.push(param); + //retry loop + loop { + // thread::sleep(Duration::from_millis(100)); + let max_retry_count = 3; + let mut retry_count = 0; - match &self.data_store.ready_to_fire(&ready_params).await { - Ok(publish_rows) => { - let mut ids: Vec = Vec::with_capacity(publish_rows.len()); - let mut publish_futures = Vec::with_capacity(publish_rows.len()); - for row in publish_rows { - let updated_row = TableRow { - id: row.get("id"), - deadline: row.get("deadline"), - readied_at: row.get("readied_at"), - readied_by: row.get("readied_by"), - message_headers: row.get("message_headers"), - message_key: row.get("message_key"), - message_value: row.get("message_value"), - }; + let mut node_id: Option = None; + // let mut row_id: Option = None; + match &self.data_store.ready_to_fire(¶m).await { + Ok(publish_rows) => { + let rdy_to_pblsh_count = publish_rows.len(); + if rdy_to_pblsh_count > 0 { + let mut ids: Vec = Vec::with_capacity(rdy_to_pblsh_count); + let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count); + for row in publish_rows { + let updated_row = TableRow { + id: row.get("id"), + deadline: row.get("deadline"), + readied_at: row.get("readied_at"), + readied_by: row.get("readied_by"), + message_headers: row.get("message_headers"), + message_key: row.get("message_key"), + message_value: row.get("message_value"), + }; - let headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { - Ok(t) => t, - Err(_e) => { - println!("error occurred while parsing"); - HashMap::new() - } - }; - //TODO: handle empty headers - // println!("checking {:?}",headers); + let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { + Ok(t) => t, + Err(_e) => { + println!("error occurred while parsing"); + HashMap::new() + } + }; + //TODO: handle empty headers + // println!("checking {:?}",headers); - publish_futures.push(self.producer.publish( - updated_row.message_value.to_string(), - Some(headers), - updated_row.message_key.to_string(), - updated_row.id.to_string(), - )) - } - let results = futures::future::join_all(publish_futures).await; - for result in results { - match result { - Ok(m) => { - ids.push(m); + node_id = Some(updated_row.readied_by.to_string()); + // row_id = Some(updated_row.id.to_string()); + + headers.insert("readied_by".to_string(), node_id.unwrap()); + + publish_futures.push(self.producer.publish( + updated_row.message_value.to_string(), + Some(headers), + updated_row.message_key.to_string(), + updated_row.id.to_string(), + )) } - Err(e) => { - println!("publish failed {:?}", e); - // failure detection needs to pick + let results = futures::future::join_all(publish_futures).await; + for result in results { + match result { + Ok(m) => { + ids.push(m); + } + Err(e) => { + log::error!("Error: delayed message publish failed {:?}", e); + break; + // failure detection needs to pick + } + } } + + if !ids.is_empty() { + if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { + println!("Error: error occurred in message processor delete_fired {}", outcome_error); + //add retry logic here + } + println!("delete ids {:?} and break", ids); + break; + } + log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); + } else { + log::debug!("no rows ready to fire for dealine {}", deadline); + break; } } - - if !ids.is_empty() { - if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { - println!("error occurred in message processor delete_fired {}", outcome_error); + Err(e) => { + if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { + //retry goes here + eprintln!("retrying"); + retry_count += 1; + if retry_count == max_retry_count { + log::error!( + "Error: max retry count {} reached by node {} for row ", + max_retry_count, + node_id.unwrap(), + // row_id.unwrap() + ); + break; + } } + log::error!("Error: error occurred in message processor while publishing {}", e); + break; } } - Err(e) => { - println!("error occurred in message processor {}", e); - } } + delay_controller.sleep().await; } } } diff --git a/chronos_bin/src/monitor.rs b/chronos_bin/src/monitor.rs index c045427..b64a788 100644 --- a/chronos_bin/src/monitor.rs +++ b/chronos_bin/src/monitor.rs @@ -1,5 +1,6 @@ use crate::postgres::pg::Pg; -use chrono::{Duration as chrono_duration, Utc}; +use crate::utils::config::ChronosConfig; +use chrono::Utc; use std::sync::Arc; use std::time::Duration; @@ -12,9 +13,13 @@ impl FailureDetector { pub async fn run(&self) { println!("Monitoring On!"); loop { - let _ = tokio::time::sleep(Duration::from_secs(10)).await; // sleep for 10sec + let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; // sleep for 10sec - match &self.data_store.failed_to_fire(Utc::now() - chrono_duration::seconds(10)).await { + match &self + .data_store + .failed_to_fire(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval))) + .await + { Ok(fetched_rows) => { if !fetched_rows.is_empty() { if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await { diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index c0843da..c0d5dfa 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -1,7 +1,8 @@ use chrono::{DateTime, Utc}; -use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime}; +use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime, Transaction}; use log::error; use std::time::{Duration, Instant}; +use tokio_postgres::error::SqlState; use tokio_postgres::types::ToSql; use tokio_postgres::{NoTls, Row}; use uuid::Uuid; @@ -47,11 +48,31 @@ pub struct GetReady { pub readied_at: DateTime, pub readied_by: Uuid, pub deadline: DateTime, - pub limit: i64, + // pub limit: i64, // pub order: &'a str, } -// create postgres client +struct PgTxn<'a> { + txn: Transaction<'a>, +} + +struct PgAccess { + client: Object, +} + +impl PgAccess { + pub async fn get_txn(&mut self) -> PgTxn { + let txn = self + .client + .build_transaction() + .isolation_level(tokio_postgres::IsolationLevel::RepeatableRead) + .start() + .await + .unwrap(); + PgTxn { txn } + } +} + impl Pg { pub async fn new(pg_config: PgConfig) -> Result { let mut config = Config::new(); @@ -67,27 +88,44 @@ impl Pg { let pool = config.create_pool(Some(Runtime::Tokio1), NoTls).map_err(PgError::CreatePool)?; - //test connection - let _ = pool.get().await.map_err(PgError::GetClientFromPool)?; + { + //test connection + let mut tmp_list: Vec = Vec::new(); + for _ in 1..=pg_config.pool_size { + let client = match pool.get().await { + Ok(client) => client, + Err(e) => { + error!("error::: Cannot get client from the pool while setting transaction isolation level {:?}", e); + return Err(PgError::GetClientFromPool(e)); + } + }; + let _ = client + .execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ;", &[]) + .await + .is_ok(); + tmp_list.push(client); + } + } + + for _ in 1..=pg_config.pool_size { + let client = match pool.get().await { + Ok(client) => client, + Err(e) => { + error!("error::: Cannot get client from the pool {:?}", e); + return Err(PgError::GetClientFromPool(e)); + } + }; + + let rs = client.query_one("show transaction_isolation", &[]).await.unwrap(); + let value: String = rs.get(0); + log::debug!("init: db-isolation-level: {}", value); + } println!("pool.status: {:?}", pool.status()); Ok(Pg { pool }) } pub async fn get_client(&self) -> Result { - // let client = self.pool.get().await.map_err(PgError::GetClientFromPool)?; - // let (client, connection) = tokio_postgres::connect( - // "host=localhost user=postgres password=admin dbname=chronos", - // NoTls, - // ) - // .await.unwrap(); - // tokio::spawn(async move { - // if let Err(e) = connection.await { - // println!("connection error: {}", e); - // } - // }); - // Ok(client) - match self.pool.get().await { Err(e) => { error!("error::: {:?}", e); @@ -95,19 +133,22 @@ impl Pg { } Ok(client) => Ok(client), } - // println!("pool.status: {:?}",self.pool.status()); - // Ok(client) } } impl Pg { pub(crate) async fn insert_to_delay(&self, params: &TableInsertRow<'_>) -> Result { let pg_client = self.get_client().await?; + let mut pg_access = PgAccess { client: pg_client }; + let pg_txn: PgTxn = pg_access.get_txn().await; + let insert_query = "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value) VALUES ($1, $2 ,$3, $4, $5 )"; + let query_execute_instant = Instant::now(); - let stmt = pg_client.prepare(insert_query).await?; - let outcome = pg_client + let stmt = pg_txn.txn.prepare(insert_query).await.unwrap(); + let outcome = pg_txn + .txn .execute( &stmt, &[ @@ -118,24 +159,27 @@ impl Pg { ¶ms.message_value, ], ) - .await - .expect("insert failed"); + .await; let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { println!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); } - Ok(outcome) + if outcome.is_ok() { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {} rows", e, outcome.unwrap()); + return Err(PgError::UnknownException(e)); + } + } + Ok(outcome.unwrap()) } - pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { - let query_execute_instant = Instant::now(); - let pg_client = self.get_client().await?; - // let transaction = pg_client.transaction().await; - - // let delete_ids = ids.join(","); - // let delete_ids = ids.strip_suffix(",").unwrap().to_string(); - // println!("delete ids {:?}", ids); + pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { + // let query_execute_instant = Instant::now(); + let pg_client = self.get_client().await.expect("Failed to get client from pool"); + let mut pg_access = PgAccess { client: pg_client }; + let pg_txn: PgTxn = pg_access.get_txn().await; let values_as_slice: Vec<_> = ids.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); @@ -145,80 +189,116 @@ impl Pg { } query = query.strip_suffix(',').unwrap().to_string(); query += ")"; - // println!("query {}", query); - //let sql = format!("DELETE FROM hanger WHERE id IN ({})", ids); - let stmt = pg_client.prepare(query.as_str()).await?; - let response = pg_client.execute(&stmt, &values_as_slice[..]).await?; - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - println!(" delete_fired query_execute_instant: {:?} ", time_elapsed); + println!("query {}", query); + let stmt = pg_txn.txn.prepare(query.as_str()).await.unwrap(); + let response = pg_txn.txn.execute(&stmt, &values_as_slice).await; + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!( + "delete_fired: Unable to commit: {}. The original transaction updated: {} rows", + e, + response.unwrap() + ); + return Err(format!("Unable to commit: {}. The original transaction updated rows", e)); + } + Ok(resp) + } + Err(e) => { + let err_code = e.code(); + if err_code.is_some() { + let db_err = err_code.unwrap(); + if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("delete_fired: Unable to execute txn due to : {}", e); + return Err(format!("delete_fired: Unable to execute txn due to : {}", e)); + } + } + Err(format!("delete_fired: Unknow exception {:?}", e)) + } } - Ok(response) } - pub(crate) async fn ready_to_fire(&self, params: &[GetReady]) -> Result, PgError> { - let pg_client = self.get_client().await?; + pub(crate) async fn ready_to_fire(&self, param: &GetReady) -> Result, String> { + //TODO handle get client error gracefully + let pg_client = self.get_client().await.expect("Unable to get client"); + let mut pg_access = PgAccess { client: pg_client }; + let pg_txn: PgTxn = pg_access.get_txn().await; - // println!("readying_update DB"); - let param = ¶ms[0]; - - // let ready_query = format!( "UPDATE hanger SET readied_at = '{}', readied_by= '{}' where id IN\ - // (SELECT ID FROM hanger WHERE deadline <= '{}' AND readied_at IS NULL LIMIT {})\ - // RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value;",¶m.readied_at, - // ¶m.readied_by, - // ¶m.deadline, - // ¶m.limit); - let ready_query = "UPDATE hanger SET readied_at = $1, readied_by = $2 where deadline <= $3 AND readied_at IS NULL RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value"; - // println!("ready query {}", ready_query); - let stmt = pg_client.prepare(ready_query).await?; - let query_execute_instant = Instant::now(); - let response = pg_client - .query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]) - .await - .expect("update failed"); - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - println!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); - } + let ready_query = "UPDATE hanger SET readied_at = $1, readied_by = $2 where deadline < $3 AND readied_at IS NULL RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value"; + // let stmt = pg_client.prepare(ready_query).await.expect("Unable to prepare query"); + // let query_execute_instant = Instant::now(); + // let response = pg_client + // .query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]) + // .await + // .expect("update failed"); + // let time_elapsed = query_execute_instant.elapsed(); + // if time_elapsed > Duration::from_millis(100) { + // println!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + // } // println!("redying success {:?}", &response); - Ok(response) - // Ok(response) + // println!("ready_to_fire query {}", ¶m.deadline); + let stmt = pg_txn.txn.prepare(ready_query).await.expect("Unable to prepare query"); + let query_execute_instant = Instant::now(); + let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; + + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); + return Err(format!( + "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", + e, resp + )); + } + let time_elapsed = query_execute_instant.elapsed(); + if time_elapsed > Duration::from_millis(100) { + error!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + } + Ok(resp) + } + Err(e) => { + let err_code = e.code(); + if err_code.is_some() { + let db_err = err_code.unwrap(); + if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("ready_to_fire: Unable to execute txn due to : {}", e); + return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); + } + } + error!("ready_to_fire: Unknow exception {:?}", e); + Err(format!("ready_to_fire: Unknow exception {:?}", e)) + } + } } - pub(crate) async fn failed_to_fire(&self, delay_time: DateTime) -> Result, PgError> { + pub(crate) async fn failed_to_fire(&self, delay_time: &DateTime) -> Result, PgError> { let query_execute_instant = Instant::now(); let pg_client = self.get_client().await?; - let get_query = "SELECT * from hanger where readied_at > $1"; - let response = pg_client.query(get_query, &[&delay_time]).await.expect("get delayed messages query failed"); + let get_query = "SELECT * from hanger where readied_at > $1 ORDER BY deadline DESC"; + let stmt = pg_client.prepare(get_query).await?; + + let response = pg_client.query(&stmt, &[&delay_time]).await.expect("get delayed messages query failed"); let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { - println!(" failed_to_fire query_execute_instant: {:?} ", time_elapsed); + error!(" failed_to_fire query_execute_instant: {:?} ", time_elapsed); } Ok(response) } - pub(crate) async fn reset_to_init(&self, to_init_list: &Vec) -> Result, PgError> { + pub(crate) async fn reset_to_init(&self, to_init_list: &Vec) -> Result, String> { let query_execute_instant = Instant::now(); - println!("to_init_list: {}", to_init_list.len()); let mut id_list = Vec::::new(); for row in to_init_list { - // let updated_row = TableRow { - // id: row.get("id"), - // deadline: row.get("deadline"), - // readied_at: row.get("readied_at"), - // readied_by: row.get("readied_by"), - // message_headers: row.get("message_headers"), - // message_key: row.get("message_key"), - // message_value: row.get("message_value"), - // }; - - // println!("logging failed to fire messages {}", updated_row.id); id_list.push(row.get("id")); } - let pg_client = self.get_client().await?; + let pg_client = self.get_client().await.expect("Unable to get client"); + let mut pg_access = PgAccess { client: pg_client }; + let pg_txn: PgTxn = pg_access.get_txn().await; // let reset_query = format!( // "UPDATE hanger SET readied_at=null , readied_by=null WHERE id IN ({})", @@ -234,13 +314,35 @@ impl Pg { query = query.strip_suffix(',').unwrap().to_string(); query += ")"; // println!("reset query {}", query); - let stmt = pg_client.prepare(query.as_str()).await?; + let stmt = pg_txn.txn.prepare(query.as_str()).await.expect("Unable to prepare query"); - pg_client.execute(&stmt, &values_as_slice[..]).await.expect("reset to init query failed"); - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - println!(" reset_to_init query_execute_instant: {:?} ", time_elapsed); + let response = pg_txn.txn.execute(&stmt, &values_as_slice[..]).await; + + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); + return Err(format!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp)); + } + let time_elapsed = query_execute_instant.elapsed(); + if time_elapsed > Duration::from_millis(100) { + error!(" ready_to_fire query_execute_instant: {:?} ", time_elapsed); + } + Ok(id_list) + } + Err(e) => { + error!("reset_to_init: Unable to execute txn due to : {}", e); + let err_code = e.code(); + if err_code.is_some() { + let db_err = err_code.unwrap(); + if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("reset_to_init: Unable to execute txn due to : {}", e); + return Err(format!("reset_to_init: Unable to execute txn due to : {}", e)); + } + } + Err(format!("reset_to_init: Unknow exception {:?}", e)) + } } - Ok(id_list) } } diff --git a/chronos_bin/src/utils/config.rs b/chronos_bin/src/utils/config.rs index e113256..58d599b 100644 --- a/chronos_bin/src/utils/config.rs +++ b/chronos_bin/src/utils/config.rs @@ -1,8 +1,9 @@ #[derive(Debug, Clone)] pub struct ChronosConfig { // pub random_delay: u64, - pub db_poll_interval: u64, - pub time_advance: i64, + pub monitor_db_poll: u64, + pub processor_db_poll: u64, + pub time_advance: u64, pub fail_detect_interval: u64, } @@ -10,10 +11,14 @@ impl ChronosConfig { pub fn from_env() -> ChronosConfig { ChronosConfig { // random_delay: env_var!("RANDOMNESS_DELAY").parse().unwrap(), - db_poll_interval: std::env::var("DB_POLL_INTERVAL") + monitor_db_poll: std::env::var("MONITOR_DB_POLL") .unwrap_or_else(|_| "5".to_string()) .parse() - .expect("Failed to parse DB_POLL_INTERVAL!!"), + .expect("Failed to parse MONITOR_DB_POLL!!"), + processor_db_poll: std::env::var("PROCESSOR_DB_POLL") + .unwrap_or_else(|_| "5".to_string()) + .parse() + .expect("Failed to parse PROCESSOR_DB_POLL!!"), time_advance: std::env::var("TIMING_ADVANCE") .unwrap_or_else(|_| "0".to_string()) .parse() diff --git a/docker-compose.yml b/docker-compose.yml index 48a62ea..374eb12 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -98,7 +98,7 @@ services: # App config (optional) # DELAY_TIME: 0 # RANDOMNESS_DELAY: 100 - # DB_POLL_INTERVAL: 5 + # MONITOR_POLL_INTERVAL: 5 # TIMING_ADVANCE: 0 # FAIL_DETECT_INTERVAL: 500 depends_on: