Skip to content

Commit

Permalink
Fix limiter connections (#46)
Browse files Browse the repository at this point in the history
* fix(proxy): fixed update active connections

* chore(examples): incresed max connection to tier 1
  • Loading branch information
paulobressan authored May 30, 2024
1 parent 584b9cc commit 1de351a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
2 changes: 1 addition & 1 deletion examples/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ data:
[[tiers]]
name = "1"
max_connections = 1
max_connections = 2
[[tiers.rates]]
interval = "1s"
limit = 1024
Expand Down
19 changes: 14 additions & 5 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,30 @@ pub struct Consumer {
active_connections: usize,
}
impl Consumer {
pub async fn inc_connections(&mut self, state: Arc<State>) {
self.active_connections += 1;
pub async fn inc_connections(&self, state: Arc<State>) {
state
.consumers
.write()
.await
.insert(self.key.clone(), self.clone());
.entry(self.key.clone())
.and_modify(|consumer| consumer.active_connections += 1);
}
pub async fn dec_connections(&mut self, state: Arc<State>) {
self.active_connections -= 1;
state
.consumers
.write()
.await
.insert(self.key.clone(), self.clone());
.entry(self.key.clone())
.and_modify(|consumer| consumer.active_connections -= 1);
}
pub async fn get_active_connections(&self, state: Arc<State>) -> usize {
state
.consumers
.read()
.await
.get(&self.key)
.map(|consumer| consumer.active_connections)
.unwrap_or_default()
}
}
impl Display for Consumer {
Expand Down
13 changes: 7 additions & 6 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,19 @@ impl ProxyApp {

match event {
DuplexEvent::ClientRead(0) | DuplexEvent::InstanceRead(0) => {
info!(
consumer = ctx.consumer.to_string(),
active_connections = ctx.consumer.active_connections,
"client disconnected"
);

ctx.consumer.dec_connections(self.state.clone()).await;
state.metrics.dec_total_connections(
&ctx.consumer,
&ctx.namespace,
&ctx.instance,
);

let active_connections =
ctx.consumer.get_active_connections(state.clone()).await;
info!(
consumer = ctx.consumer.to_string(),
active_connections, "client disconnected"
);
return Ok(());
}
DuplexEvent::ClientRead(bytes) => {
Expand Down

0 comments on commit 1de351a

Please sign in to comment.