-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stabilize adaptive rate limit by considering current rate #1027
Changes from 6 commits
f45324a
0016506
2809df2
615a949
f3da7af
2bf3e74
b111326
c6199b1
5105a41
956a34d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Queue; | ||
|
||
/** | ||
* Track the current request rate based on the past requests within ESTIMATE_RANGE_DURATION_MSEC | ||
* milliseconds period. | ||
*/ | ||
public class RequestRateMeter { | ||
private static final long ESTIMATE_RANGE_DURATION_MSEC = 3000; | ||
|
||
private static class DataPoint { | ||
long timestamp; | ||
long requestCount; | ||
public DataPoint(long timestamp, long requestCount) { | ||
this.timestamp = timestamp; | ||
this.requestCount = requestCount; | ||
} | ||
} | ||
|
||
private Queue<DataPoint> dataPoints = new LinkedList<>(); | ||
private long currentSum = 0; | ||
|
||
synchronized void addDataPoint(long timestamp, long requestCount) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, could we consider reduce the datapoints maintained by using bucket-based aggregation which have bounde memory usage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Number of data points tend to be small like around 1 ~ 20 since each batch contain around 5k requests (we put data point for each batch). And memory usage is almost ignorable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added removeOldDataPoints in addDataPoint. |
||
dataPoints.add(new DataPoint(timestamp, requestCount)); | ||
currentSum += requestCount; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could removeOldDataPoint during add to reduce memory usage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. Memory usage is almost ignorable. |
||
} | ||
|
||
synchronized void removeOldDataPoints() { | ||
long curr = System.currentTimeMillis(); | ||
while (!dataPoints.isEmpty() && dataPoints.peek().timestamp < curr - ESTIMATE_RANGE_DURATION_MSEC) { | ||
currentSum -= dataPoints.remove().requestCount; | ||
} | ||
} | ||
|
||
synchronized long getCurrentEstimatedRate() { | ||
removeOldDataPoints(); | ||
return currentSum * 1000 / ESTIMATE_RANGE_DURATION_MSEC; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import static org.junit.jupiter.api.Assertions.*; | ||
|
||
class RequestRateMeterTest { | ||
|
||
private RequestRateMeter requestRateMeter; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
requestRateMeter = new RequestRateMeter(); | ||
} | ||
|
||
@Test | ||
void testAddDataPoint() { | ||
long timestamp = System.currentTimeMillis(); | ||
requestRateMeter.addDataPoint(timestamp, 30); | ||
assertEquals(10, requestRateMeter.getCurrentEstimatedRate()); | ||
} | ||
|
||
@Test | ||
void testRemoveOldDataPoints() { | ||
long currentTime = System.currentTimeMillis(); | ||
requestRateMeter.addDataPoint(currentTime - 4000, 30); | ||
requestRateMeter.addDataPoint(currentTime - 2000, 60); | ||
requestRateMeter.addDataPoint(currentTime, 90); | ||
|
||
assertEquals((60 + 90)/3, requestRateMeter.getCurrentEstimatedRate()); | ||
} | ||
|
||
@Test | ||
void testGetCurrentEstimatedRate() { | ||
long currentTime = System.currentTimeMillis(); | ||
requestRateMeter.addDataPoint(currentTime - 2500, 30); | ||
requestRateMeter.addDataPoint(currentTime - 1500, 60); | ||
requestRateMeter.addDataPoint(currentTime - 500, 90); | ||
|
||
assertEquals((30 + 60 + 90)/3, requestRateMeter.getCurrentEstimatedRate()); | ||
} | ||
|
||
@Test | ||
void testEmptyRateMeter() { | ||
assertEquals(0, requestRateMeter.getCurrentEstimatedRate()); | ||
} | ||
|
||
@Test | ||
void testSingleDataPoint() { | ||
requestRateMeter.addDataPoint(System.currentTimeMillis(), 30); | ||
assertEquals(30 / 3, requestRateMeter.getCurrentEstimatedRate()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason choose 4s as default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was fixed value and I made it a configuration. The original intention is having higher initial backoff to quickly reduce the rate.