Skip to content

Commit

Permalink
perf(filter): add cache for pattern matching handler (#1038)
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf authored Jun 26, 2024
1 parent 72e9661 commit 4a2b7a5
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 65 deletions.
21 changes: 19 additions & 2 deletions cmd/filter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/moira-alert/moira/cmd"
"github.com/moira-alert/moira/filter"
)

type config struct {
Expand All @@ -28,10 +29,23 @@ type filterConfig struct {
PatternsUpdatePeriod string `yaml:"patterns_update_period"`
// DropMetricsTTL this is time window how older metric we can get from now.
DropMetricsTTL string `yaml:"drop_metrics_ttl"`
// Flags for compatibility with different graphite behaviours
// Flags for compatibility with different graphite behaviours.
Compatibility compatibility `yaml:"graphite_compatibility"`
// Time after which the batch of metrics is forced to be saved, default is 1s
// Time after which the batch of metrics is forced to be saved, default is 1s.
BatchForcedSaveTimeout string `yaml:"batch_forced_save_timeout"`
// PatternStorageCfg defines the configuration for pattern storage.
PatternStorageCfg patternStorageConfig `yaml:"pattern_storage"`
}

type patternStorageConfig struct {
// PatternMatchingCacheSize determines the size of the pattern matching cache.
PatternMatchingCacheSize int `yaml:"pattern_matching_cache_size"`
}

func (cfg patternStorageConfig) toFilterPatternStorageConfig() filter.PatternStorageConfig {
return filter.PatternStorageConfig{
PatternMatchingCacheSize: cfg.PatternMatchingCacheSize,
}
}

func getDefault() config {
Expand All @@ -58,6 +72,9 @@ func getDefault() config {
AllowRegexLooseStartMatch: false,
AllowRegexMatchEmpty: true,
},
PatternStorageCfg: patternStorageConfig{
PatternMatchingCacheSize: 100,
},
},
Telemetry: cmd.TelemetryConfig{
Listen: ":8094",
Expand Down
3 changes: 2 additions & 1 deletion cmd/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func main() {
Msg("Failed to initialize cache storage with given config")
}

patternStorage, err := filter.NewPatternStorage(database, filterMetrics, logger, compatibility)
filterPatternStorageCfg := config.Filter.PatternStorageCfg.toFilterPatternStorageConfig()
patternStorage, err := filter.NewPatternStorage(filterPatternStorageCfg, database, filterMetrics, logger, compatibility)
if err != nil {
logger.Fatal().
Error(err).
Expand Down
54 changes: 45 additions & 9 deletions filter/patterns_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,22 @@ import (

"github.com/moira-alert/moira/clock"

lrucache "github.com/hashicorp/golang-lru/v2"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
)

// PatternStorageConfig defines the configuration for pattern storage.
type PatternStorageConfig struct {
// PatternMatchingCacheSize determines the size of the pattern matching cache.
PatternMatchingCacheSize int
}

type patternMatchingCacheItem struct {
nameTagValue string
matchingHandler MatchingHandler
}

// PatternStorage contains pattern tree.
type PatternStorage struct {
database moira.Database
Expand All @@ -21,24 +33,36 @@ type PatternStorage struct {
PatternIndex atomic.Value
SeriesByTagPatternIndex atomic.Value
compatibility Compatibility
patternMatchingCache *lrucache.Cache[string, *patternMatchingCacheItem]
}

// NewPatternStorage creates new PatternStorage struct.
func NewPatternStorage(
cfg PatternStorageConfig,
database moira.Database,
metrics *metrics.FilterMetrics,
logger moira.Logger,
compatibility Compatibility,
) (*PatternStorage, error) {
patternMatchingCache, err := lrucache.New[string, *patternMatchingCacheItem](cfg.PatternMatchingCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create new lru pattern matching cache: %w", err)
}

storage := &PatternStorage{
database: database,
metrics: metrics,
logger: logger,
clock: clock.NewSystemClock(),
compatibility: compatibility,
database: database,
metrics: metrics,
logger: logger,
clock: clock.NewSystemClock(),
compatibility: compatibility,
patternMatchingCache: patternMatchingCache,
}

if err = storage.Refresh(); err != nil {
return nil, fmt.Errorf("failed to refresh pattern storage: %w", err)
}
err := storage.Refresh()
return storage, err

return storage, nil
}

// Refresh builds pattern's indexes from redis data.
Expand All @@ -59,8 +83,20 @@ func (storage *PatternStorage) Refresh() error {
}
}

storage.PatternIndex.Store(NewPatternIndex(storage.logger, patterns, storage.compatibility))
storage.SeriesByTagPatternIndex.Store(NewSeriesByTagPatternIndex(storage.logger, seriesByTagPatterns, storage.compatibility))
storage.PatternIndex.Store(NewPatternIndex(
storage.logger,
patterns,
storage.compatibility,
))

storage.SeriesByTagPatternIndex.Store(NewSeriesByTagPatternIndex(
storage.logger,
seriesByTagPatterns,
storage.compatibility,
storage.patternMatchingCache,
storage.metrics,
))

return nil
}

Expand Down
10 changes: 8 additions & 2 deletions filter/patterns_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@ func TestProcessIncomingMetric(t *testing.T) {
database := mock_moira_alert.NewMockDatabase(mockCtrl)
logger, _ := logging.ConfigureLog("stdout", "warn", "test", true)

patternStorageCfg := PatternStorageConfig{
PatternMatchingCacheSize: 100,
}

Convey("Create new pattern storage, GetPatterns returns error, should error", t, func() {
database.EXPECT().GetPatterns().Return(nil, fmt.Errorf("some error here"))
filterMetrics := metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry())
_, err := NewPatternStorage(database, filterMetrics, logger, Compatibility{AllowRegexLooseStartMatch: true})
So(err, ShouldBeError, fmt.Errorf("some error here"))
_, err := NewPatternStorage(patternStorageCfg, database, filterMetrics, logger, Compatibility{AllowRegexLooseStartMatch: true})
So(err, ShouldBeError, fmt.Errorf("failed to refresh pattern storage: some error here"))
})

database.EXPECT().GetPatterns().Return(testPatterns, nil)
patternsStorage, err := NewPatternStorage(
patternStorageCfg,
database,
metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry()),
logger,
Compatibility{AllowRegexLooseStartMatch: true},
)

systemClock := mock_clock.NewMockClock(mockCtrl)
systemClock.EXPECT().Now().Return(time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC)).AnyTimes()
patternsStorage.clock = systemClock
Expand Down
28 changes: 20 additions & 8 deletions filter/series_by_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func ParseSeriesByTag(input string) ([]TagSpec, error) {
type MatchingHandler func(string, map[string]string) bool

// CreateMatchingHandlerForPattern creates function for matching by tag list.
func CreateMatchingHandlerForPattern(tagSpecs []TagSpec, compatibility *Compatibility) (string, MatchingHandler, error) {
func CreateMatchingHandlerForPattern(
tagSpecs []TagSpec,
compatibility *Compatibility,
) (string, MatchingHandler, error) {
matchingHandlers := make([]MatchingHandler, 0)
var nameTagValue string

Expand Down Expand Up @@ -161,7 +164,10 @@ func CreateMatchingHandlerForPattern(tagSpecs []TagSpec, compatibility *Compatib
return nameTagValue, matchingHandler, nil
}

func createMatchingHandlerForOneTag(spec TagSpec, compatibility *Compatibility) (MatchingHandler, error) {
func createMatchingHandlerForOneTag(
spec TagSpec,
compatibility *Compatibility,
) (MatchingHandler, error) {
var matchingHandlerCondition func(string) bool
allowMatchEmpty := false

Expand Down Expand Up @@ -217,16 +223,22 @@ func createMatchingHandlerForOneTag(spec TagSpec, compatibility *Compatibility)
}, nil
}

func newMatchRegex(value string, compatibility *Compatibility) (*regexp.Regexp, error) {
if value == "*" {
value = ".*"
func newMatchRegex(
tagValue string,
compatibility *Compatibility,
) (*regexp.Regexp, error) {
if tagValue == "*" {
tagValue = ".*"
}

if !compatibility.AllowRegexLooseStartMatch {
value = "^" + value
tagValue = "^" + tagValue
}

matchRegex, err := regexp.Compile(value)
matchRegex, err := regexp.Compile(tagValue)
if err != nil {
return nil, fmt.Errorf("failed to compile regex: %w with tag value: %s", err, tagValue)
}

return matchRegex, err
return matchRegex, nil
}
42 changes: 32 additions & 10 deletions filter/series_by_tag_pattern_index.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package filter

import (
lrucache "github.com/hashicorp/golang-lru/v2"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
)

// SeriesByTagPatternIndex helps to index the seriesByTag patterns and allows to match them by metric.
Expand All @@ -19,27 +21,47 @@ func NewSeriesByTagPatternIndex(
logger moira.Logger,
tagSpecsByPattern map[string][]TagSpec,
compatibility Compatibility,
patternMatchingCache *lrucache.Cache[string, *patternMatchingCacheItem],
metrics *metrics.FilterMetrics,
) *SeriesByTagPatternIndex {
namesPrefixTree := &PrefixTree{Logger: logger, Root: &PatternNode{}}
withoutStrictNameTagPatternMatchers := make(map[string]MatchingHandler)

var patternMatchingEvicted int64

for pattern, tagSpecs := range tagSpecsByPattern {
nameTagValue, matchingHandler, err := CreateMatchingHandlerForPattern(tagSpecs, &compatibility)
if err != nil {
logger.Error().
Error(err).
String("pattern", pattern).
Msg("Failed to create MatchingHandler for pattern")
continue
var patternMatching *patternMatchingCacheItem

patternMatching, ok := patternMatchingCache.Get(pattern)
if !ok {
nameTagValue, matchingHandler, err := CreateMatchingHandlerForPattern(tagSpecs, &compatibility)
if err != nil {
logger.Error().
Error(err).
String("pattern", pattern).
Msg("Failed to create MatchingHandler for pattern")
continue
}

patternMatching = &patternMatchingCacheItem{
nameTagValue: nameTagValue,
matchingHandler: matchingHandler,
}

if evicted := patternMatchingCache.Add(pattern, patternMatching); evicted {
patternMatchingEvicted++
}
}

if nameTagValue == "" {
withoutStrictNameTagPatternMatchers[pattern] = matchingHandler
if patternMatching.nameTagValue == "" {
withoutStrictNameTagPatternMatchers[pattern] = patternMatching.matchingHandler
} else {
namesPrefixTree.AddWithPayload(nameTagValue, pattern, matchingHandler)
namesPrefixTree.AddWithPayload(patternMatching.nameTagValue, pattern, patternMatching.matchingHandler)
}
}

metrics.MarkPatternMatchingEvicted(patternMatchingEvicted)

return &SeriesByTagPatternIndex{
compatibility: compatibility,
namesPrefixTree: namesPrefixTree,
Expand Down
31 changes: 26 additions & 5 deletions filter/series_by_tag_pattern_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"sort"
"testing"

lrucache "github.com/hashicorp/golang-lru/v2"
logging "github.com/moira-alert/moira/logging/zerolog_adapter"
"github.com/moira-alert/moira/metrics"
. "github.com/smartystreets/goconvey/convey"
)

Expand Down Expand Up @@ -110,11 +112,17 @@ func TestParseSeriesByTag(t *testing.T) {

func TestSeriesByTagPatternIndex(t *testing.T) {
logger, _ := logging.GetLogger("SeriesByTag")
filterMetrics := metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry())

Convey("Given empty patterns with tagspecs, should build index and match patterns", t, func(c C) {
compatibility := Compatibility{
AllowRegexLooseStartMatch: true,
}
index := NewSeriesByTagPatternIndex(logger, map[string][]TagSpec{}, compatibility)

patternMatchingCache, err := lrucache.New[string, *patternMatchingCacheItem](100)
So(err, ShouldBeNil)

index := NewSeriesByTagPatternIndex(logger, map[string][]TagSpec{}, compatibility, patternMatchingCache, filterMetrics)
c.So(index.MatchPatterns("", nil), ShouldResemble, []string{})
})

Expand Down Expand Up @@ -162,7 +170,11 @@ func TestSeriesByTagPatternIndex(t *testing.T) {
AllowRegexMatchEmpty: false,
AllowRegexLooseStartMatch: true,
}
index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility)

patternMatchingCache, err := lrucache.New[string, *patternMatchingCacheItem](100)
So(err, ShouldBeNil)

index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility, patternMatchingCache, filterMetrics)
for _, testCase := range testCases {
patterns := index.MatchPatterns(testCase.Name, testCase.Labels)
sort.Strings(patterns)
Expand Down Expand Up @@ -335,7 +347,11 @@ func TestSeriesByTagPatternIndex(t *testing.T) {
AllowRegexLooseStartMatch: true,
AllowRegexMatchEmpty: false,
}
index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility)

patternMatchingCache, err := lrucache.New[string, *patternMatchingCacheItem](100)
So(err, ShouldBeNil)

index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility, patternMatchingCache, filterMetrics)
for _, testCase := range testCases {
patterns := index.MatchPatterns(testCase.Name, testCase.Labels)
sort.Strings(patterns)
Expand All @@ -344,8 +360,9 @@ func TestSeriesByTagPatternIndex(t *testing.T) {
})
}

func TestSeriesByTagPatternIndexCabonCompatibility(t *testing.T) {
func TestSeriesByTagPatternIndexCarbonCompatibility(t *testing.T) {
logger, _ := logging.GetLogger("SeriesByTag")
filterMetrics := metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry())

Convey("Given related patterns with tagspecs, should build index and match patterns", t, func(c C) {
tagSpecsByPattern := map[string][]TagSpec{
Expand Down Expand Up @@ -510,7 +527,11 @@ func TestSeriesByTagPatternIndexCabonCompatibility(t *testing.T) {
AllowRegexLooseStartMatch: false,
AllowRegexMatchEmpty: true,
}
index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility)

patternMatchingCache, err := lrucache.New[string, *patternMatchingCacheItem](100)
So(err, ShouldBeNil)

index := NewSeriesByTagPatternIndex(logger, tagSpecsByPattern, compatibility, patternMatchingCache, filterMetrics)
for _, testCase := range testCases {
patterns := index.MatchPatterns(testCase.Name, testCase.Labels)
sort.Strings(patterns)
Expand Down
Loading

0 comments on commit 4a2b7a5

Please sign in to comment.