Skip to content

Commit

Permalink
return RatelimitInternal/ExternalPerWorkflowID
Browse files Browse the repository at this point in the history
  • Loading branch information
arzonus committed Dec 18, 2024
1 parent 1481dcc commit b18e42b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
59 changes: 39 additions & 20 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -47,13 +48,15 @@ type WFCache interface {
}

type wfCache struct {
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
timeSource clock.TimeSource
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
timeSource clock.TimeSource
ratelimitExternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter

// we use functions to get cache items, and the current time, so we can mock it in unit tests
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
Expand All @@ -73,13 +76,15 @@ type cacheValue struct {

// Params is the parameters for a new WFCache
type Params struct {
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
RatelimitExternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
RatelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
}

// New creates a new WFCache
Expand All @@ -91,12 +96,14 @@ func New(params Params) WFCache {
MaxCount: params.MaxCount,
ActivelyEvict: true,
}),
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
timeSource: clock.NewRealTimeSource(),
logger: params.Logger,
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
timeSource: clock.NewRealTimeSource(),
logger: params.Logger,
ratelimitExternalPerWorkflowID: params.RatelimitExternalPerWorkflowID,
ratelimitInternalPerWorkflowID: params.RatelimitInternalPerWorkflowID,
}
// We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests
cache.getCacheItemFn = cache.getCacheItem
Expand Down Expand Up @@ -141,6 +148,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
domainName,
"external",
metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter,
c.ratelimitExternalPerWorkflowID,
)
return false
}
Expand All @@ -154,6 +162,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
domainName,
"internal",
metrics.WorkflowIDCacheRequestsInternalRatelimitedCounter,
c.ratelimitInternalPerWorkflowID,
)
return false
}
Expand All @@ -171,17 +180,27 @@ func (c *wfCache) emitRateLimitMetrics(
domainName string,
callType string,
metric int,
enabled dynamicconfig.BoolPropertyFnWithDomainFilter,
) {
var mode string
if enabled(domainName) {
mode = "enabled"
} else {
mode = "shadow"
}

c.metricsClient.Scope(
metrics.HistoryClientWfIDCacheScope,
metrics.DomainTag(domainName),
metrics.ModeTag(mode),
).IncCounter(metric)
c.logger.Info(
"Rate limiting workflowID",
tag.RequestType(callType),
tag.WorkflowDomainID(domainID),
tag.WorkflowDomainName(domainName),
tag.WorkflowID(workflowID),
tag.Mode(mode),
)
}

Expand Down
14 changes: 0 additions & 14 deletions service/history/workflowcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -122,8 +120,6 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -165,8 +161,6 @@ func TestWfCache_AllowError(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -214,8 +208,6 @@ func TestWfCache_AllowDomainCacheError(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -257,8 +249,6 @@ func TestWfCache_CacheExternalDisabled(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return false },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -300,8 +290,6 @@ func TestWfCache_CacheInternalDisabled(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return false },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down Expand Up @@ -349,8 +337,6 @@ func TestWfCache_RejectLog(t *testing.T) {
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
Expand Down

0 comments on commit b18e42b

Please sign in to comment.