From b9c25f8c3702af2a453a55b0e0ff97007b9f1fd0 Mon Sep 17 00:00:00 2001 From: Min <76146890+MinH-09@users.noreply.github.com> Date: Fri, 29 Dec 2023 15:24:15 +0800 Subject: [PATCH] feat:add cache about networktopology (#2924) Signed-off-by: huangmin <2107139596@qq.com> --- pkg/cache/cache.go | 1 + pkg/cache/cache_mock.go | 255 ++++++++++ pkg/slices/slices.go | 18 + pkg/slices/slices_test.go | 31 ++ scheduler/config/config.go | 23 + scheduler/config/config_test.go | 32 ++ scheduler/config/constants.go | 6 + scheduler/config/testdata/scheduler.yaml | 3 + scheduler/networktopology/network_topology.go | 91 +++- .../networktopology/network_topology_test.go | 395 ++++++++++------ scheduler/networktopology/probes.go | 170 ++++++- scheduler/networktopology/probes_test.go | 440 ++++++++++++++---- scheduler/scheduler.go | 3 +- 13 files changed, 1201 insertions(+), 267 deletions(-) create mode 100644 pkg/cache/cache_mock.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index bbbb4fb61b5..59de840e632 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -14,6 +14,7 @@ * limitations under the License. */ +//go:generate mockgen -destination cache_mock.go -source cache.go -package cache package cache import ( diff --git a/pkg/cache/cache_mock.go b/pkg/cache/cache_mock.go new file mode 100644 index 00000000000..625e37e98f5 --- /dev/null +++ b/pkg/cache/cache_mock.go @@ -0,0 +1,255 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: cache.go +// +// Generated by this command: +// +// mockgen -destination cache_mock.go -source cache.go -package cache +// +// Package cache is a generated GoMock package. +package cache + +import ( + io "io" + reflect "reflect" + time "time" + + gomock "go.uber.org/mock/gomock" +) + +// MockCache is a mock of Cache interface. +type MockCache struct { + ctrl *gomock.Controller + recorder *MockCacheMockRecorder +} + +// MockCacheMockRecorder is the mock recorder for MockCache. +type MockCacheMockRecorder struct { + mock *MockCache +} + +// NewMockCache creates a new mock instance. +func NewMockCache(ctrl *gomock.Controller) *MockCache { + mock := &MockCache{ctrl: ctrl} + mock.recorder = &MockCacheMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCache) EXPECT() *MockCacheMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockCache) Add(k string, x any, d time.Duration) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", k, x, d) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockCacheMockRecorder) Add(k, x, d any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockCache)(nil).Add), k, x, d) +} + +// Delete mocks base method. +func (m *MockCache) Delete(k string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", k) +} + +// Delete indicates an expected call of Delete. +func (mr *MockCacheMockRecorder) Delete(k any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockCache)(nil).Delete), k) +} + +// DeleteExpired mocks base method. +func (m *MockCache) DeleteExpired() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "DeleteExpired") +} + +// DeleteExpired indicates an expected call of DeleteExpired. +func (mr *MockCacheMockRecorder) DeleteExpired() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExpired", reflect.TypeOf((*MockCache)(nil).DeleteExpired)) +} + +// Flush mocks base method. +func (m *MockCache) Flush() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Flush") +} + +// Flush indicates an expected call of Flush. +func (mr *MockCacheMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockCache)(nil).Flush)) +} + +// Get mocks base method. +func (m *MockCache) Get(k string) (any, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", k) + ret0, _ := ret[0].(any) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockCacheMockRecorder) Get(k any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockCache)(nil).Get), k) +} + +// GetWithExpiration mocks base method. +func (m *MockCache) GetWithExpiration(k string) (any, time.Time, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetWithExpiration", k) + ret0, _ := ret[0].(any) + ret1, _ := ret[1].(time.Time) + ret2, _ := ret[2].(bool) + return ret0, ret1, ret2 +} + +// GetWithExpiration indicates an expected call of GetWithExpiration. +func (mr *MockCacheMockRecorder) GetWithExpiration(k any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWithExpiration", reflect.TypeOf((*MockCache)(nil).GetWithExpiration), k) +} + +// ItemCount mocks base method. +func (m *MockCache) ItemCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ItemCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// ItemCount indicates an expected call of ItemCount. +func (mr *MockCacheMockRecorder) ItemCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ItemCount", reflect.TypeOf((*MockCache)(nil).ItemCount)) +} + +// Items mocks base method. +func (m *MockCache) Items() map[string]Item { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Items") + ret0, _ := ret[0].(map[string]Item) + return ret0 +} + +// Items indicates an expected call of Items. +func (mr *MockCacheMockRecorder) Items() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Items", reflect.TypeOf((*MockCache)(nil).Items)) +} + +// Keys mocks base method. +func (m *MockCache) Keys() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Keys") + ret0, _ := ret[0].([]string) + return ret0 +} + +// Keys indicates an expected call of Keys. +func (mr *MockCacheMockRecorder) Keys() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keys", reflect.TypeOf((*MockCache)(nil).Keys)) +} + +// Load mocks base method. +func (m *MockCache) Load(r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Load", r) + ret0, _ := ret[0].(error) + return ret0 +} + +// Load indicates an expected call of Load. +func (mr *MockCacheMockRecorder) Load(r any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockCache)(nil).Load), r) +} + +// LoadFile mocks base method. +func (m *MockCache) LoadFile(fname string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadFile", fname) + ret0, _ := ret[0].(error) + return ret0 +} + +// LoadFile indicates an expected call of LoadFile. +func (mr *MockCacheMockRecorder) LoadFile(fname any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadFile", reflect.TypeOf((*MockCache)(nil).LoadFile), fname) +} + +// OnEvicted mocks base method. +func (m *MockCache) OnEvicted(f func(string, any)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnEvicted", f) +} + +// OnEvicted indicates an expected call of OnEvicted. +func (mr *MockCacheMockRecorder) OnEvicted(f any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnEvicted", reflect.TypeOf((*MockCache)(nil).OnEvicted), f) +} + +// Save mocks base method. +func (m *MockCache) Save(w io.Writer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Save", w) + ret0, _ := ret[0].(error) + return ret0 +} + +// Save indicates an expected call of Save. +func (mr *MockCacheMockRecorder) Save(w any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Save", reflect.TypeOf((*MockCache)(nil).Save), w) +} + +// SaveFile mocks base method. +func (m *MockCache) SaveFile(fname string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveFile", fname) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveFile indicates an expected call of SaveFile. +func (mr *MockCacheMockRecorder) SaveFile(fname any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveFile", reflect.TypeOf((*MockCache)(nil).SaveFile), fname) +} + +// Set mocks base method. +func (m *MockCache) Set(k string, x any, d time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Set", k, x, d) +} + +// Set indicates an expected call of Set. +func (mr *MockCacheMockRecorder) Set(k, x, d any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockCache)(nil).Set), k, x, d) +} + +// SetDefault mocks base method. +func (m *MockCache) SetDefault(k string, x any) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetDefault", k, x) +} + +// SetDefault indicates an expected call of SetDefault. +func (mr *MockCacheMockRecorder) SetDefault(k, x any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetDefault", reflect.TypeOf((*MockCache)(nil).SetDefault), k, x) +} diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go index bb886a69da3..67749c4ff2e 100644 --- a/pkg/slices/slices.go +++ b/pkg/slices/slices.go @@ -67,3 +67,21 @@ func Reverse[S ~[]T, T any](s S) { s[i], s[j] = s[j], s[i] } } + +// Complement removes duplicate elements of collections in first collection. +func Complement[T comparable](a, b []T) []T { + var result []T + + visited := make(map[T]struct{}) + for _, v := range b { + visited[v] = struct{}{} + } + + for _, v := range a { + if _, exists := visited[v]; !exists { + result = append(result, v) + } + } + + return result +} diff --git a/pkg/slices/slices_test.go b/pkg/slices/slices_test.go index 32cdffab53d..f8bc68e2e36 100644 --- a/pkg/slices/slices_test.go +++ b/pkg/slices/slices_test.go @@ -178,3 +178,34 @@ func TestReverse(t *testing.T) { }) } } + +func TestComplement(t *testing.T) { + tests := []struct { + name string + source []int + exclude []int + expected []int + }{ + { + name: "slices with duplicates", + source: []int{1, 2, 3}, + exclude: []int{1, 2, 4}, + expected: []int{3}, + }, + { + name: "slices with no duplicates", + source: []int{1, 2, 3}, + exclude: []int{4, 5, 6}, + expected: []int{1, 2, 3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := Complement(tt.source, tt.exclude) + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("expected %v, but got %v", tt.expected, result) + } + }) + } +} diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 1641fe1ef98..7ebff7b853c 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -342,6 +342,9 @@ type NetworkTopologyConfig struct { // Probe is the configuration of probe. Probe ProbeConfig `yaml:"probe" mapstructure:"probe"` + + // Cache is the configuration of cache. + Cache CacheConfig `yaml:"cache" mapstructure:"cache"` } type ProbeConfig struct { @@ -352,6 +355,14 @@ type ProbeConfig struct { Count int `mapstructure:"count" yaml:"count"` } +type CacheConfig struct { + // Interval is cache cleanup interval. + Interval time.Duration `yaml:"interval" mapstructure:"interval"` + + // TTL is networkTopology cache items TLL. + TTL time.Duration `yaml:"ttl" mapstructure:"tll"` +} + type TrainerConfig struct { // Enable trainer service. Enable bool `yaml:"enable" mapstructure:"enable"` @@ -458,6 +469,10 @@ func New() *Config { QueueLength: DefaultProbeQueueLength, Count: DefaultProbeCount, }, + Cache: CacheConfig{ + Interval: DefaultNetworkTopologyCacheInterval, + TTL: DefaultNetworkTopologyCacheTLL, + }, }, Trainer: TrainerConfig{ Enable: false, @@ -644,6 +659,14 @@ func (cfg *Config) Validate() error { return errors.New("probe requires parameter count") } + if cfg.NetworkTopology.Cache.Interval <= 0 { + return errors.New("networkTopology requires parameter interval") + } + + if cfg.NetworkTopology.Cache.TTL <= 0 { + return errors.New("networkTopology requires parameter ttl") + } + if cfg.Trainer.Enable { if cfg.Trainer.Addr == "" { return errors.New("trainer requires parameter addr") diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index 87044bc3e8c..52e49214929 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -180,6 +180,10 @@ func TestConfig_Load(t *testing.T) { QueueLength: 5, Count: 10, }, + Cache: CacheConfig{ + Interval: 5 * time.Minute, + TTL: 5 * time.Minute, + }, }, Trainer: TrainerConfig{ Enable: false, @@ -763,6 +767,34 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "networkTopology requires parameter collectInterval") }, }, + { + name: "networkTopology requires parameter interval", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job = mockJobConfig + cfg.NetworkTopology.Cache.Interval = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "networkTopology requires parameter interval") + }, + }, + { + name: "networkTopology requires parameter ttl", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job = mockJobConfig + cfg.NetworkTopology.Cache.TTL = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "networkTopology requires parameter ttl") + }, + }, { name: "probe requires parameter queueLength", config: New(), diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 653c0e11bf7..99ee4fce017 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -179,6 +179,12 @@ const ( // DefaultNetworkTopologyCollectInterval is the default interval of collecting network topology. DefaultNetworkTopologyCollectInterval = 2 * time.Hour + // DefaultNetworkTopologyCacheInterval is the default cache cleanup interval. + DefaultNetworkTopologyCacheInterval = 5 * time.Minute + + // DefaultNetworkTopologyCacheTLL is the default ttl of networkTopology cache. + DefaultNetworkTopologyCacheTLL = 5 * time.Minute + // DefaultProbeQueueLength is the default length of probe queue. DefaultProbeQueueLength = 5 diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index d18f2c026d4..05bb85db2f4 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -98,6 +98,9 @@ networkTopology: probe: queueLength: 5 count: 10 + cache: + interval: 5m + ttl: 5m trainer: enable: false diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index 2c8a313957d..13ce323104d 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -30,8 +30,10 @@ import ( "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/container/set" pkgredis "d7y.io/dragonfly/v2/pkg/redis" + "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/storage" @@ -87,6 +89,9 @@ type networkTopology struct { // rdb is Redis universal client interface. rdb redis.UniversalClient + // Cache instance. + cache cache.Cache + // resource is resource interface. resource resource.Resource @@ -98,10 +103,11 @@ type networkTopology struct { } // New network topology interface. -func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) { +func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalClient, cache cache.Cache, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) { return &networkTopology{ config: cfg, rdb: rdb, + cache: cache, resource: resource, storage: storage, done: make(chan struct{}), @@ -135,13 +141,25 @@ func (nt *networkTopology) Has(srcHostID string, destHostID string) bool { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - networkTopologyCount, err := nt.rdb.Exists(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID)).Result() + networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID) + if _, _, ok := nt.cache.GetWithExpiration(networkTopologyKey); ok { + return true + } + + networkTopology, err := nt.rdb.HGetAll(ctx, networkTopologyKey).Result() if err != nil { - logger.Errorf("failed to check whether network topology exists: %s", err.Error()) + logger.Errorf("get networkTopology failed: %s", err.Error()) + return false + } + + if len(networkTopology) == 0 { return false } - return networkTopologyCount == 1 + // Add cache data. + nt.cache.Set(networkTopologyKey, networkTopology, nt.config.Cache.TTL) + + return true } // Store stores source host and destination host. @@ -169,19 +187,37 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err blocklist := set.NewSafeSet[string]() blocklist.Add(hostID) - candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist) - if len(candidateHosts) == 0 { + hosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist) + if len(hosts) == 0 { return nil, errors.New("probed hosts not found") } - if len(candidateHosts) <= nt.config.Probe.Count { - return candidateHosts, nil + if len(hosts) <= nt.config.Probe.Count { + return hosts, nil } - var probedCountKeys []string - for _, candidateHost := range candidateHosts { - probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID)) + var ( + probedCounts []uint64 + probedCountKeys []string + probedCountHosts []*resource.Host + ) + for _, host := range hosts { + probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(host.ID) + cache, _, ok := nt.cache.GetWithExpiration(probedCountKey) + if !ok { + probedCountHosts = append(probedCountHosts, host) + probedCountKeys = append(probedCountKeys, probedCountKey) + continue + } else { + probedCount, ok := cache.(uint64) + if ok { + probedCounts = append(probedCounts, probedCount) + } else { + probedCounts = append(probedCounts, uint64(0)) + } + } } + candidateHosts := slices.Complement(hosts, probedCountHosts) rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result() if err != nil { @@ -189,11 +225,10 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err } // Filter invalid probed count. If probed key not exist, the probed count is nil. - var probedCounts []uint64 for i, rawProbedCount := range rawProbedCounts { // Initialize the probedCount value of host in redis when the host is first selected as the candidate probe target. if rawProbedCount == nil { - if err := nt.rdb.Set(ctx, pkgredis.MakeProbedCountKeyInScheduler(candidateHosts[i].ID), 0, 0).Err(); err != nil { + if err := nt.rdb.Set(ctx, probedCountKeys[i], 0, 0).Err(); err != nil { return nil, err } @@ -211,8 +246,12 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err return nil, errors.New("invalid probed count") } + // Add cache data. + nt.cache.Set(probedCountKeys[i], probedCount, nt.config.Cache.TTL) + probedCounts = append(probedCounts, probedCount) } + candidateHosts = append(candidateHosts, probedCountHosts...) // Sort candidate hosts by probed count. sort.Slice(candidateHosts, func(i, j int) bool { @@ -256,12 +295,16 @@ func (nt *networkTopology) DeleteHost(hostID string) error { return err } + for _, deleteKey := range deleteKeys { + nt.cache.Delete(deleteKey) + } + return nil } // Probes loads probes interface by source host id and destination host id. func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes { - return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID) + return NewProbes(nt.config, nt.rdb, nt.cache, srcHostID, destHostID) } // ProbedCount is the number of times the host has been probed. @@ -269,7 +312,25 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - return nt.rdb.Get(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID)).Uint64() + probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(hostID) + if cache, _, ok := nt.cache.GetWithExpiration(probedCountKey); ok { + probedCount, ok := cache.(uint64) + if ok { + return probedCount, nil + } + + return uint64(0), errors.New("get probedCount failed") + } + + probedCount, err := nt.rdb.Get(ctx, probedCountKey).Uint64() + if err != nil { + return uint64(0), err + } + + // Add cache data. + nt.cache.Set(probedCountKey, probedCount, nt.config.Cache.TTL) + + return probedCount, nil } // Snapshot writes the current network topology to the storage. diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go index 1f4a3bd37c2..1ed88c2c99b 100644 --- a/scheduler/networktopology/network_topology_test.go +++ b/scheduler/networktopology/network_topology_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/container/set" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/scheduler/resource" @@ -56,9 +57,10 @@ func Test_NewNetworkTopology(t *testing.T) { rdb, _ := redismock.NewClientMock() res := resource.NewMockResource(ctl) + cache := cache.NewMockCache(ctl) storage := storagemocks.NewMockStorage(ctl) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err) }) } @@ -69,7 +71,7 @@ func TestNetworkTopology_Serve(t *testing.T) { name string sleep func() mock func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) expect func(t *testing.T, networkTopology NetworkTopology, err error) }{ { @@ -78,21 +80,19 @@ func TestNetworkTopology_Serve(t *testing.T) { time.Sleep(3 * time.Second) }, mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal( - mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true).Times(2), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -110,7 +110,7 @@ func TestNetworkTopology_Serve(t *testing.T) { time.Sleep(5 * time.Second) }, mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetErr( errors.New("get probed count keys error")) @@ -118,15 +118,13 @@ func TestNetworkTopology_Serve(t *testing.T) { []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal( - mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true).Times(2), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -149,10 +147,11 @@ func TestNetworkTopology_Serve(t *testing.T) { res := resource.NewMockResource(ctl) hostManager := resource.NewMockHostManager(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(res.EXPECT(), hostManager, hostManager.EXPECT(), storage.EXPECT(), mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(res.EXPECT(), hostManager, hostManager.EXPECT(), storage.EXPECT(), mockRDBClient, cache.EXPECT()) mockNetworkTopologyConfig.CollectInterval = 2 * time.Second - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err) tc.sleep() networkTopology.Stop() @@ -162,42 +161,45 @@ func TestNetworkTopology_Serve(t *testing.T) { func TestNetworkTopology_Has(t *testing.T) { tests := []struct { - name string - mock func(mockRDBClient redismock.ClientMock) - expect func(t *testing.T, networkTopology NetworkTopology, err error) + name string + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) + run func(t *testing.T, networkTopology NetworkTopology, err error) }{ { - name: "network topology between src host and destination host exists", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + name: "network topology cache between src host and destination host exists", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, true) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.True(networkTopology.Has(mockSeedHost.ID, mockHost.ID)) }, }, { - name: "network topology between src host and destination host does not exist", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + name: "check network topology between src host and destination host exist error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr( + errors.New("check network topology between src host and destination host exist error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.False(networkTopology.Has(mockSeedHost.ID, mockHost.ID)) }, }, { - name: "check network topology between src host and destination host exist error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr( - errors.New("check network topology between src host and destination host exist error")) + name: "network topology between src host and destination host exist", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockCache.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) - assert.False(networkTopology.Has(mockSeedHost.ID, mockHost.ID)) + assert.True(networkTopology.Has(mockSeedHost.ID, mockHost.ID)) }, }, } @@ -210,10 +212,11 @@ func TestNetworkTopology_Has(t *testing.T) { rdb, mockRDBClient := redismock.NewClientMock() res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - tc.expect(t, networkTopology, err) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) + tc.run(t, networkTopology, err) mockRDBClient.ClearExpect() }) } @@ -221,16 +224,16 @@ func TestNetworkTopology_Has(t *testing.T) { func TestNetworkTopology_Store(t *testing.T) { tests := []struct { - name string - mock func(mockRDBClient redismock.ClientMock) - expect func(t *testing.T, networkTopology NetworkTopology, err error) + name string + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) + run func(t *testing.T, networkTopology NetworkTopology, err error) }{ { name: "network topology between src host and destination host exists", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, true) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.NoError(networkTopology.Store(mockSeedHost.ID, mockHost.ID)) @@ -238,12 +241,13 @@ func TestNetworkTopology_Store(t *testing.T) { }, { name: "network topology between src host and destination host does not exist", - mock: func(mockRDBClient redismock.ClientMock) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(map[string]string{}) mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.NoError(networkTopology.Store(mockSeedHost.ID, mockHost.ID)) @@ -251,12 +255,13 @@ func TestNetworkTopology_Store(t *testing.T) { }, { name: "set createdAt error when network topology between src host and destination host does not exist", - mock: func(mockRDBClient redismock.ClientMock) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(map[string]string{}) mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetErr(errors.New("set createdAt error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set createdAt error") @@ -272,10 +277,11 @@ func TestNetworkTopology_Store(t *testing.T) { rdb, mockRDBClient := redismock.NewClientMock() res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - tc.expect(t, networkTopology, err) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) + tc.run(t, networkTopology, err) mockRDBClient.ClearExpect() }) } @@ -286,22 +292,62 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { name string hosts []*resource.Host mock func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) expect func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) }{ + { + name: "find probed hosts with cache", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(uint64(6), mockCacheExpiration, true).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(5), + mc.Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(5), + ) + + var probedCountKeys []string + for i := 1; i < len(hosts); i++ { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(hosts[i].ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]any{"5", "4", "3", "2", "1"}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.NoError(err) + assert.Equal(len(probedHosts), 5) + assert.EqualValues(probedHosts[0].ID, "bac") + assert.EqualValues(probedHosts[1].ID, "bav") + assert.EqualValues(probedHosts[2].ID, "baz") + assert.EqualValues(probedHosts[3].ID, "bar") + assert.EqualValues(probedHosts[4].ID, "foo") + }, + }, { name: "find probed hosts", hosts: []*resource.Host{ mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), + mc.Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(6), ) var probedCountKeys []string @@ -328,7 +374,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { name: "find probed hosts when map is insufficient", hosts: []*resource.Host{mockHost}, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) @@ -359,13 +405,14 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), ) var probedCountKeys []string @@ -388,8 +435,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { name: "probed hosts not found", hosts: []*resource.Host{}, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { - mockRDBClient.MatchExpectationsInOrder(true) + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( @@ -412,13 +458,14 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), ) var probedCountKeys []string @@ -442,13 +489,14 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), ) var probedCountKeys []string @@ -466,19 +514,59 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { assert.EqualError(err, "invalid value type") }, }, + { + name: "type convert error", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(5), + mc.GetWithExpiration(gomock.Any()).Return("foo", mockCacheExpiration, true).Times(1), + mc.Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(5), + ) + + var probedCountKeys []string + for _, host := range hosts[:len(hosts)-1] { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]any{"6", "5", "4", "3", "2"}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.NoError(err) + assert.Equal(len(probedHosts), 5) + assert.EqualValues(probedHosts[0].ID, "bac") + assert.EqualValues(probedHosts[1].ID, "bav") + assert.EqualValues(probedHosts[2].ID, "baz") + assert.EqualValues(probedHosts[3].ID, "bar") + assert.EqualValues(probedHosts[4].ID, "foo") + }, + }, { name: "Initialize the probedCount value of host in redis", hosts: []*resource.Host{ mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), + mc.Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(5), ) var probedCountKeys []string @@ -500,7 +588,6 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { assert.EqualValues(probedHosts[2].ID, "bav") assert.EqualValues(probedHosts[3].ID, "baz") assert.EqualValues(probedHosts[4].ID, "bar") - }, }, { @@ -509,13 +596,14 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, }, mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mh *resource.MockHostManagerMockRecorder, mc *cache.MockCacheMockRecorder, hosts []*resource.Host) { mockRDBClient.MatchExpectationsInOrder(true) blocklist := set.NewSafeSet[string]() blocklist.Add(mockSeedHost.ID) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + mc.GetWithExpiration(gomock.Any()).Return(nil, mockCacheExpiration, false).Times(6), ) var probedCountKeys []string @@ -545,10 +633,11 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) - tc.mock(mockRDBClient, res.EXPECT(), hostManager, hostManager.EXPECT(), tc.hosts) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, res.EXPECT(), hostManager, hostManager.EXPECT(), cache.EXPECT(), tc.hosts) mockNetworkTopologyConfig.Probe.Count = 5 - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err, tc.hosts) mockRDBClient.ClearExpect() }) @@ -559,23 +648,26 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { tests := []struct { name string deleteKeys []string - mock func(mockRDBClient redismock.ClientMock, keys []string) - expect func(t *testing.T, networkTopology NetworkTopology, err error) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) + run func(t *testing.T, networkTopology NetworkTopology, err error) }{ { name: "delete host", deleteKeys: []string{pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[1]}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[2]}, 0) mockRDBClient.ExpectDel(keys...).SetVal(4) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.NoError(networkTopology.DeleteHost(mockHost.ID)) @@ -584,11 +676,11 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { { name: "get source network topology keys error", deleteKeys: []string{}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetErr( errors.New("get source network topology keys error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.DeleteHost(mockHost.ID), "get source network topology keys error") @@ -597,13 +689,13 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { { name: "get destination network topology keys error", deleteKeys: []string{}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetErr( errors.New("get destination network topology keys error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.DeleteHost(mockHost.ID), "get destination network topology keys error") @@ -612,14 +704,14 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { { name: "get source probes keys error", deleteKeys: []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[0]}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetErr( errors.New("get source probes keys error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.DeleteHost(mockHost.ID), "get source probes keys error") @@ -628,14 +720,14 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { { name: "get destination probes keys error", deleteKeys: []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[0]}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetErr( errors.New("get destination probes keys error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.DeleteHost(mockHost.ID), "get destination probes keys error") @@ -646,7 +738,7 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { deleteKeys: []string{pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, - mock: func(mockRDBClient redismock.ClientMock, keys []string) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, keys []string) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, "*"), math.MaxInt64).SetVal([]string{}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[1]}, 0) @@ -654,7 +746,7 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { mockRDBClient.ExpectScan(0, pkgredis.MakeProbesKeyInScheduler("*", mockHost.ID), math.MaxInt64).SetVal([]string{keys[2]}, 0) mockRDBClient.ExpectDel(keys...).SetErr(errors.New("delete network topology and probes error")) }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + run: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) assert.EqualError(networkTopology.DeleteHost(mockHost.ID), "delete network topology and probes error") @@ -670,10 +762,11 @@ func TestNetworkTopology_DeleteHost(t *testing.T) { rdb, mockRDBClient := redismock.NewClientMock() res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(mockRDBClient, tc.deleteKeys) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT(), tc.deleteKeys) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - tc.expect(t, networkTopology, err) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) + tc.run(t, networkTopology, err) mockRDBClient.ClearExpect() }) } @@ -692,7 +785,7 @@ func TestNetworkTopology_Probes(t *testing.T) { ps := networkTopology.Probes(mockSeedHost.ID, mockHost.ID) probes := ps.(*probes) - assert.Equal(probes.config.QueueLength, 5) + assert.Equal(probes.config.Probe.QueueLength, 5) assert.NotNil(probes.rdb) assert.Equal(probes.srcHostID, mockSeedHost.ID) assert.Equal(probes.destHostID, mockHost.ID) @@ -708,8 +801,9 @@ func TestNetworkTopology_Probes(t *testing.T) { rdb, _ := redismock.NewClientMock() res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) + cache := cache.NewMockCache(ctl) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err) }) } @@ -718,34 +812,65 @@ func TestNetworkTopology_Probes(t *testing.T) { func TestNetworkTopology_ProbedCount(t *testing.T) { tests := []struct { name string - mock func(mockRDBClient redismock.ClientMock) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) expect func(t *testing.T, networkTopology NetworkTopology, err error) }{ + { + name: "get probed count with cache", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).Return(uint64(mockProbedCount), mockCacheExpiration, true) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + assert := assert.New(t) + assert.NoError(err) + + probedCount, err := networkTopology.ProbedCount(mockHost.ID) + assert.EqualValues(probedCount, mockProbedCount) + assert.NoError(err) + }, + }, { name: "get probed count", - mock: func(mockRDBClient redismock.ClientMock) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).Return(nil, mockCacheExpiration, false) mockRDBClient.ExpectGet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(strconv.Itoa(mockProbedCount)) + mockCache.Set(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), uint64(mockProbedCount), gomock.Any()) }, expect: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) probedCount, err := networkTopology.ProbedCount(mockHost.ID) - assert.NoError(err) assert.EqualValues(probedCount, mockProbedCount) + assert.NoError(err) + }, + }, + { + name: "type convert error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + assert := assert.New(t) + assert.NoError(err) + + probedCount, err := networkTopology.ProbedCount(mockHost.ID) + assert.Equal(probedCount, uint64(0)) + assert.EqualError(err, "get probedCount failed") }, }, { name: "get probed count error", - mock: func(mockRDBClient redismock.ClientMock) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).Return(nil, mockCacheExpiration, false) mockRDBClient.ExpectGet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetErr(errors.New("get probed count error")) }, expect: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) - _, err = networkTopology.ProbedCount(mockHost.ID) - assert.EqualError(err, "get probed count error") + probedCount, _ := networkTopology.ProbedCount(mockHost.ID) + assert.Equal(probedCount, uint64(0)) }, }, } @@ -758,9 +883,10 @@ func TestNetworkTopology_ProbedCount(t *testing.T) { rdb, mockRDBClient := redismock.NewClientMock() res := resource.NewMockResource(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err) mockRDBClient.ClearExpect() }) @@ -771,27 +897,25 @@ func TestNetworkTopology_Snapshot(t *testing.T) { tests := []struct { name string mock func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) expect func(t *testing.T, networkTopology NetworkTopology, err error) }{ { name: "writes the current network topology to the storage", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal( - mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false).Times(1), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()).Times(1), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true).Times(2), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -806,7 +930,7 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "get probed count keys error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetErr( errors.New("get probed count keys error")) @@ -820,7 +944,7 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "parse probed count keys in scheduler error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{"foo"}, 0) }, @@ -833,7 +957,7 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "get network topology keys error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) @@ -849,7 +973,7 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "parse network topology keys in scheduler error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( @@ -869,7 +993,7 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "construct destination hosts for network topology error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) @@ -892,17 +1016,19 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "get averageRTT error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetErr( + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr( errors.New("get averageRTT error")) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false).Times(1), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -917,19 +1043,21 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "get createdAt error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetErr( + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr( errors.New("get createdAt error")) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -944,21 +1072,22 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "get updatedAt error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetErr( + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr( errors.New("get updatedAt error")) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(nil).Times(1), @@ -973,21 +1102,18 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "construct source hosts for network topology error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { - mockRDBClient.MatchExpectationsInOrder(true) + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal( - mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true).Times(2), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(nil, false), ) @@ -1001,21 +1127,19 @@ func TestNetworkTopology_Snapshot(t *testing.T) { { name: "inserts the network topology into csv file error", mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, - mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock) { + mh *resource.MockHostManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, mockRDBClient redismock.ClientMock, mc *cache.MockCacheMockRecorder) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectScan(0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).SetVal( []string{pkgredis.MakeProbedCountKeyInScheduler(mockSeedHost.ID)}, 0) mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal( []string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal( - strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal( - mockProbesCreatedAt.Format(time.RFC3339Nano)) - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal( - mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false), + mc.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()), + mc.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true).Times(2), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Eq(mockSeedHost.ID)).Return(mockSeedHost, true), ms.CreateNetworkTopology(gomock.Any()).Return(errors.New("inserts the network topology into csv file error")).Times(1), @@ -1037,9 +1161,10 @@ func TestNetworkTopology_Snapshot(t *testing.T) { res := resource.NewMockResource(ctl) hostManager := resource.NewMockHostManager(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.mock(res.EXPECT(), hostManager, hostManager.EXPECT(), storage.EXPECT(), mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(res.EXPECT(), hostManager, hostManager.EXPECT(), storage.EXPECT(), mockRDBClient, cache.EXPECT()) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage) tc.expect(t, networkTopology, err) mockRDBClient.ClearExpect() }) diff --git a/scheduler/networktopology/probes.go b/scheduler/networktopology/probes.go index 08e69c77d4d..75b36ced59b 100644 --- a/scheduler/networktopology/probes.go +++ b/scheduler/networktopology/probes.go @@ -21,10 +21,14 @@ package networktopology import ( "context" "encoding/json" + "errors" + "strconv" "time" "github.com/go-redis/redis/v8" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/cache" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" @@ -71,11 +75,14 @@ type Probes interface { // probes is the implementation of Probes. type probes struct { // config is the probe config. - config config.ProbeConfig + config config.NetworkTopologyConfig // rdb is redis universal client interface. rdb redis.UniversalClient + // Cache instance. + cache cache.Cache + // srcHostID is the source host id. srcHostID string @@ -84,10 +91,11 @@ type probes struct { } // NewProbes creates a probes interface. -func NewProbes(cfg config.ProbeConfig, rdb redis.UniversalClient, srcHostID string, destHostID string) Probes { +func NewProbes(cfg config.NetworkTopologyConfig, rdb redis.UniversalClient, cache cache.Cache, srcHostID string, destHostID string) Probes { return &probes{ config: cfg, rdb: rdb, + cache: cache, srcHostID: srcHostID, destHostID: destHostID, } @@ -98,17 +106,39 @@ func (p *probes) Peek() (*Probe, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - rawProbe, err := p.rdb.LIndex(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Bytes() + probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID) + if cache, _, ok := p.cache.GetWithExpiration(probesKey); ok { + probes, ok := cache.([]*Probe) + if !ok { + return nil, errors.New("get probes failed") + } + + if len(probes) == 0 { + return nil, errors.New("probes cache is empty") + } + + return probes[0], nil + } + + rawProbes, err := p.rdb.LRange(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0, -1).Result() if err != nil { + logger.Errorf("get probes failed: %s", err.Error()) return nil, err } - probe := &Probe{} - if err = json.Unmarshal(rawProbe, probe); err != nil { - return nil, err + var probes []*Probe + for _, rawProbe := range rawProbes { + probe := &Probe{} + if err = json.Unmarshal([]byte(rawProbe), probe); err != nil { + return nil, err + } + probes = append(probes, probe) } - return probe, nil + // Add cache data. + p.cache.Set(probesKey, probes, p.config.Cache.TTL) + + return probes[0], nil } // Enqueue enqueues probe into the queue. @@ -123,7 +153,7 @@ func (p *probes) Enqueue(probe *Probe) error { } // If the queue is full, remove the oldest probe. - if length >= int64(p.config.QueueLength) { + if length >= int64(p.config.Probe.QueueLength) { if _, err := p.dequeue(); err != nil { return err } @@ -135,16 +165,18 @@ func (p *probes) Enqueue(probe *Probe) error { return err } - if err := p.rdb.RPush(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), data).Err(); err != nil { + probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID) + if err := p.rdb.RPush(ctx, probesKey, data).Err(); err != nil { return err } + p.cache.Delete(probesKey) // Calculate the moving average round-trip time. var averageRTT time.Duration if length > 0 { // If the queue is not empty, calculate the // moving average round-trip time. - rawProbes, err := p.rdb.LRange(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0, -1).Result() + rawProbes, err := p.rdb.LRange(context.Background(), probesKey, 0, -1).Result() if err != nil { return err } @@ -170,17 +202,20 @@ func (p *probes) Enqueue(probe *Probe) error { } // Update the moving average round-trip time and updated time. - if err := p.rdb.HSet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT", averageRTT.Nanoseconds()).Err(); err != nil { + networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID) + if err := p.rdb.HSet(ctx, networkTopologyKey, "averageRTT", averageRTT.Nanoseconds()).Err(); err != nil { return err } - - if err := p.rdb.HSet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt", probe.CreatedAt.Format(time.RFC3339Nano)).Err(); err != nil { + if err := p.rdb.HSet(ctx, networkTopologyKey, "updatedAt", probe.CreatedAt.Format(time.RFC3339Nano)).Err(); err != nil { return err } + p.cache.Delete(networkTopologyKey) - if err := p.rdb.Incr(ctx, pkgredis.MakeProbedCountKeyInScheduler(p.destHostID)).Err(); err != nil { + probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(p.destHostID) + if err := p.rdb.Incr(ctx, probedCountKey).Err(); err != nil { return err } + p.cache.Delete(probedCountKey) return nil } @@ -190,7 +225,39 @@ func (p *probes) Len() (int64, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - return p.rdb.LLen(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result() + probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID) + if cache, _, ok := p.cache.GetWithExpiration(probesKey); ok { + probes, ok := cache.([]*Probe) + if !ok { + return int64(0), errors.New("get probes failed") + } + + return int64(len(probes)), nil + } + + rawProbes, err := p.rdb.LRange(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0, -1).Result() + if err != nil { + logger.Errorf("get probes failed: %s", err.Error()) + return int64(0), err + } + + if len(rawProbes) == 0 { + return int64(0), err + } + + var probes []*Probe + for _, rawProbe := range rawProbes { + probe := &Probe{} + if err = json.Unmarshal([]byte(rawProbe), probe); err != nil { + return int64(0), err + } + probes = append(probes, probe) + } + + // Add cache data. + p.cache.Set(probesKey, probes, p.config.Cache.TTL) + + return int64(len(probes)), nil } // CreatedAt is the creation time of probes. @@ -198,7 +265,29 @@ func (p *probes) CreatedAt() (time.Time, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Time() + var networkTopology map[string]string + networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID) + cache, _, ok := p.cache.GetWithExpiration(networkTopologyKey) + if ok { + if networkTopology, ok = cache.(map[string]string); !ok { + return time.Time{}, errors.New("get networkTopology failed") + } + } else { + var err error + if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil { + return time.Time{}, err + } + + // Add cache data. + p.cache.Set(networkTopologyKey, networkTopology, p.config.Cache.TTL) + } + + createdAt, err := time.Parse(time.RFC3339Nano, networkTopology["createdAt"]) + if err != nil { + return time.Time{}, err + } + + return createdAt, nil } // UpdatedAt is the updated time to store probe. @@ -206,7 +295,29 @@ func (p *probes) UpdatedAt() (time.Time, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Time() + var networkTopology map[string]string + networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID) + cache, _, ok := p.cache.GetWithExpiration(networkTopologyKey) + if ok { + if networkTopology, ok = cache.(map[string]string); !ok { + return time.Time{}, errors.New("get networkTopology failed") + } + } else { + var err error + if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil { + return time.Time{}, err + } + + // Add cache data. + p.cache.Set(networkTopologyKey, networkTopology, p.config.Cache.TTL) + } + + updatedAt, err := time.Parse(time.RFC3339Nano, networkTopology["updatedAt"]) + if err != nil { + return time.Time{}, err + } + + return updatedAt, nil } // AverageRTT is the moving average round-trip time of probes. @@ -214,9 +325,26 @@ func (p *probes) AverageRTT() (time.Duration, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - averageRTT, err := p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT").Int64() + var networkTopology map[string]string + networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID) + cache, _, ok := p.cache.GetWithExpiration(networkTopologyKey) + if ok { + if networkTopology, ok = cache.(map[string]string); !ok { + return time.Duration(0), errors.New("get networkTopology failed") + } + } else { + var err error + if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil { + return time.Duration(0), err + } + + // Add cache data. + p.cache.Set(networkTopologyKey, networkTopology, p.config.Cache.TTL) + } + + averageRTT, err := strconv.ParseInt(networkTopology["averageRTT"], 10, 64) if err != nil { - return 0, err + return time.Duration(0), err } return time.Duration(averageRTT), nil @@ -227,10 +355,12 @@ func (p *probes) dequeue() (*Probe, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() - rawProbe, err := p.rdb.LPop(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Bytes() + probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID) + rawProbe, err := p.rdb.LPop(ctx, probesKey).Bytes() if err != nil { return nil, err } + p.cache.Delete(probesKey) probe := &Probe{} if err = json.Unmarshal(rawProbe, probe); err != nil { diff --git a/scheduler/networktopology/probes_test.go b/scheduler/networktopology/probes_test.go index a5ffa074763..f56246f6a93 100644 --- a/scheduler/networktopology/probes_test.go +++ b/scheduler/networktopology/probes_test.go @@ -29,6 +29,7 @@ import ( "go.uber.org/atomic" "go.uber.org/mock/gomock" + "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/idgen" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/pkg/types" @@ -158,7 +159,13 @@ var ( }, } - mockProbesCreatedAt = time.Now() + mockNetworkTopology = map[string]string{ + "createdAt": time.Now().Format(time.RFC3339Nano), + "updatedAt": time.Now().Format(time.RFC3339Nano), + "averageRTT": strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10), + } + + mockCacheExpiration = time.Now() mockProbedCount = 10 ) @@ -172,7 +179,7 @@ func Test_NewProbes(t *testing.T) { expect: func(t *testing.T, ps Probes) { assert := assert.New(t) probes := ps.(*probes) - assert.Equal(probes.config.QueueLength, 5) + assert.Equal(probes.config.Probe.QueueLength, 5) assert.NotNil(probes.rdb) assert.Equal(probes.srcHostID, mockSeedHost.ID) assert.Equal(probes.destHostID, mockHost.ID) @@ -182,8 +189,12 @@ func Test_NewProbes(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() rdb, _ := redismock.NewClientMock() - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + cache := cache.NewMockCache(ctl) + + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) }) } } @@ -192,19 +203,26 @@ func TestProbes_Peek(t *testing.T) { tests := []struct { name string probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) expect func(t *testing.T, p Probes) }{ { name: "queue has one probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - data, err := json.Marshal(mockProbe) - if err != nil { - t.Fatal(err) + probes: []*Probe{mockProbe}, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + var rawProbes []string + for _, p := range ps { + data, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + + rawProbes = append(rawProbes, string(data)) } - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(string(data)) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -221,7 +239,6 @@ func TestProbes_Peek(t *testing.T) { assert.Equal(probe.Host.PlatformFamily, mockProbe.Host.PlatformFamily) assert.Equal(probe.Host.PlatformVersion, mockProbe.Host.PlatformVersion) assert.Equal(probe.Host.KernelVersion, mockProbe.Host.KernelVersion) - assert.Equal(probe.Host.ConcurrentUploadLimit, mockProbe.Host.ConcurrentUploadLimit) assert.Equal(probe.Host.ConcurrentUploadCount, mockProbe.Host.ConcurrentUploadCount) assert.Equal(probe.Host.UploadCount, mockProbe.Host.UploadCount) assert.Equal(probe.Host.UploadFailedCount, mockProbe.Host.UploadFailedCount) @@ -243,7 +260,7 @@ func TestProbes_Peek(t *testing.T) { {mockHost, 34 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -255,21 +272,27 @@ func TestProbes_Peek(t *testing.T) { } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(rawProbes[4]) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(ps, mockCacheExpiration, true) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(rawProbes[4]) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(6) - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(rawProbes[0]) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(ps, mockCacheExpiration, true) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) probe, err := ps.Peek() assert.NoError(err) - assert.Equal(probe.RTT, mockProbe.RTT) + assert.Equal(probe.RTT, 31*time.Millisecond) assert.NoError(ps.Enqueue(mockProbe)) probe, err = ps.Peek() @@ -280,8 +303,9 @@ func TestProbes_Peek(t *testing.T) { { name: "queue has no probe", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetErr(errors.New("no probe")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetErr(errors.New("no probe")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -289,11 +313,36 @@ func TestProbes_Peek(t *testing.T) { assert.EqualError(err, "no probe") }, }, + { + name: "type convert error", + probes: []*Probe{}, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.Peek() + assert.EqualError(err, "get probes failed") + }, + }, + { + name: "probes cache is empty", + probes: []*Probe{}, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return([]*Probe{}, mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.Peek() + assert.EqualError(err, "probes cache is empty") + }, + }, { name: "unmarshal probe error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal("foo") + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{"foo"}) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -309,9 +358,10 @@ func TestProbes_Peek(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT(), tc.probes) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } @@ -321,7 +371,7 @@ func TestProbes_Enqueue(t *testing.T) { tests := []struct { name string probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) expect func(t *testing.T, ps Probes) }{ { @@ -329,18 +379,22 @@ func TestProbes_Enqueue(t *testing.T) { probes: []*Probe{ mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(ps[0]) if err != nil { t.Fatal(err) } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", mockProbe.RTT.Nanoseconds()).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(1) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -353,7 +407,7 @@ func TestProbes_Enqueue(t *testing.T) { mockProbe, {mockHost, 31 * time.Millisecond, time.Now()}, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -365,12 +419,17 @@ func TestProbes_Enqueue(t *testing.T) { } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{rawProbes[0]}) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.All()) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[0])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{rawProbes[1], rawProbes[0]}) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30100000)).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(2) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -386,7 +445,7 @@ func TestProbes_Enqueue(t *testing.T) { {mockHost, 34 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -398,13 +457,19 @@ func TestProbes_Enqueue(t *testing.T) { } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(rawProbes[0]) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(6) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -414,8 +479,9 @@ func TestProbes_Enqueue(t *testing.T) { { name: "get the length of the queue error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get the length of the queue error")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetErr(errors.New("get the length of the queue error")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -423,10 +489,28 @@ func TestProbes_Enqueue(t *testing.T) { }, }, { - name: "remove the oldest probe error when the queue is full", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + name: "remove the oldest probe error when the queue is full", + probes: []*Probe{ + {mockHost, 31 * time.Millisecond, time.Now()}, + {mockHost, 32 * time.Millisecond, time.Now()}, + {mockHost, 33 * time.Millisecond, time.Now()}, + {mockHost, 34 * time.Millisecond, time.Now()}, + mockProbe, + }, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + var rawProbes []string + for _, p := range ps { + data, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + + rawProbes = append(rawProbes, string(data)) + } + + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("remove the oldest probe error when the queue is full")) }, expect: func(t *testing.T, ps Probes) { @@ -437,8 +521,9 @@ func TestProbes_Enqueue(t *testing.T) { { name: "marshal probe error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -456,13 +541,14 @@ func TestProbes_Enqueue(t *testing.T) { { name: "push probe in queue error", probes: []*Probe{mockProbe}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(ps[0]) if err != nil { t.Fatal(err) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetErr( errors.New("push probe in queue error")) }, @@ -477,7 +563,7 @@ func TestProbes_Enqueue(t *testing.T) { {mockHost, 31 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -488,8 +574,11 @@ func TestProbes_Enqueue(t *testing.T) { rawProbes = append(rawProbes, string(data)) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[1])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetErr( errors.New("get probes error")) }, @@ -504,7 +593,7 @@ func TestProbes_Enqueue(t *testing.T) { {mockHost, 31 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -515,8 +604,11 @@ func TestProbes_Enqueue(t *testing.T) { rawProbes = append(rawProbes, string(data)) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[1])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{"foo"}) }, expect: func(t *testing.T, ps Probes) { @@ -527,14 +619,16 @@ func TestProbes_Enqueue(t *testing.T) { { name: "update the moving average round-trip time error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(mockProbe) if err != nil { t.Fatal(err) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", mockProbe.RTT.Nanoseconds()).SetErr(errors.New("update the moving average round-trip time error")) }, @@ -546,14 +640,16 @@ func TestProbes_Enqueue(t *testing.T) { { name: "update the updated time error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(mockProbe) if err != nil { t.Fatal(err) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", mockProbe.RTT.Nanoseconds()).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), @@ -567,18 +663,21 @@ func TestProbes_Enqueue(t *testing.T) { { name: "update the number of times the host has been probed error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(mockProbe) if err != nil { t.Fatal(err) } - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", mockProbe.RTT.Nanoseconds()).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetErr(errors.New("update the number of times the host has been probed error")) }, expect: func(t *testing.T, ps Probes) { @@ -594,26 +693,40 @@ func TestProbes_Enqueue(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT(), tc.probes) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } } +// 1 func TestProbes_Len(t *testing.T) { tests := []struct { name string probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) expect func(t *testing.T, ps Probes) }{ { name: "queue has one probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) + probes: []*Probe{mockProbe}, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + var rawProbes []string + for _, p := range ps { + data, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + + rawProbes = append(rawProbes, string(data)) + } + + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -631,7 +744,7 @@ func TestProbes_Len(t *testing.T) { {mockHost, 34 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -643,15 +756,21 @@ func TestProbes_Len(t *testing.T) { } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(ps, mockCacheExpiration, true) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[4])) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(6) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(ps, mockCacheExpiration, true) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -668,8 +787,9 @@ func TestProbes_Len(t *testing.T) { { name: "queue has no probe", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(nil) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -681,8 +801,9 @@ func TestProbes_Len(t *testing.T) { { name: "get queue length error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get queue length error")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetErr(errors.New("get queue length error")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -690,6 +811,32 @@ func TestProbes_Len(t *testing.T) { assert.EqualError(err, "get queue length error") }, }, + { + name: "type convert error", + probes: []*Probe{}, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + assert.EqualError(ps.Enqueue(mockProbe), "get probes failed") + }, + }, + { + name: "unmarshal probe error", + probes: []*Probe{ + {mockHost, 31 * time.Millisecond, time.Now()}, + mockProbe, + }, + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{"foo"}) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + assert.EqualError(ps.Enqueue(mockProbe), "invalid character 'o' in literal false (expecting 'a')") + }, + }, } for _, tc := range tests { @@ -698,9 +845,10 @@ func TestProbes_Len(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT(), tc.probes) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } @@ -709,25 +857,28 @@ func TestProbes_Len(t *testing.T) { func TestProbes_CreatedAt(t *testing.T) { tests := []struct { name string - mock func(mockRDBClient redismock.ClientMock) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) expect func(t *testing.T, ps Probes) }{ { name: "get creation time of probes", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetVal(mockProbesCreatedAt.Format(time.RFC3339Nano)) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockCache.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) createdAt, err := ps.CreatedAt() assert.NoError(err) - assert.True(createdAt.Equal(mockProbesCreatedAt)) + assert.Equal(createdAt.Format(time.RFC3339Nano), mockNetworkTopology["createdAt"]) }, }, { name: "get creation time of probes error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt").SetErr(errors.New("get creation time of probes error")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get creation time of probes error")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -735,6 +886,28 @@ func TestProbes_CreatedAt(t *testing.T) { assert.EqualError(err, "get creation time of probes error") }, }, + { + name: "type convert error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.CreatedAt() + assert.EqualError(err, "get networkTopology failed") + }, + }, + { + name: "time parse error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(map[string]string{"createdAt": "foo"}, mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.CreatedAt() + assert.EqualError(err, "parsing time \"foo\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"foo\" as \"2006\"") + }, + }, } for _, tc := range tests { @@ -743,9 +916,10 @@ func TestProbes_CreatedAt(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } @@ -754,25 +928,28 @@ func TestProbes_CreatedAt(t *testing.T) { func TestProbes_UpdatedAt(t *testing.T) { tests := []struct { name string - mock func(mockRDBClient redismock.ClientMock) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) expect func(t *testing.T, ps Probes) }{ { name: "get update time of probes", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal(mockProbe.CreatedAt.Format(time.RFC3339Nano)) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockCache.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) updatedAt, err := ps.UpdatedAt() assert.NoError(err) - assert.True(updatedAt.Equal(mockProbe.CreatedAt)) + assert.Equal(updatedAt.Format(time.RFC3339Nano), mockNetworkTopology["updatedAt"]) }, }, { name: "get update time of probes error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetErr(errors.New("get update time of probes error")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get update time of probes error")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -780,6 +957,28 @@ func TestProbes_UpdatedAt(t *testing.T) { assert.EqualError(err, "get update time of probes error") }, }, + { + name: "type convert error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.UpdatedAt() + assert.EqualError(err, "get networkTopology failed") + }, + }, + { + name: "time parse error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(map[string]string{"updatedAt": "foo"}, mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.UpdatedAt() + assert.EqualError(err, "parsing time \"foo\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"foo\" as \"2006\"") + }, + }, } for _, tc := range tests { @@ -788,9 +987,10 @@ func TestProbes_UpdatedAt(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } @@ -799,13 +999,15 @@ func TestProbes_UpdatedAt(t *testing.T) { func TestProbes_AverageRTT(t *testing.T) { tests := []struct { name string - mock func(mockRDBClient redismock.ClientMock) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) expect func(t *testing.T, ps Probes) }{ { name: "get averageRTT of probes", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal(strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(mockNetworkTopology) + mockCache.Set(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), mockNetworkTopology, gomock.Any()) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -816,8 +1018,9 @@ func TestProbes_AverageRTT(t *testing.T) { }, { name: "get averageRTT of probes error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetErr(errors.New("get averageRTT of probes error")) + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get averageRTT of probes error")) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -825,6 +1028,40 @@ func TestProbes_AverageRTT(t *testing.T) { assert.EqualError(err, "get averageRTT of probes error") }, }, + { + name: "get averageRTT of probes with cache", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(mockNetworkTopology, mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + averageRTT, err := ps.AverageRTT() + assert.NoError(err) + assert.Equal(averageRTT, mockProbe.RTT) + }, + }, + { + name: "type convert error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return("foo", mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.AverageRTT() + assert.EqualError(err, "get networkTopology failed") + }, + }, + { + name: "parseInt error", + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) { + mockCache.GetWithExpiration(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(map[string]string{"averageRTT": "foo"}, mockCacheExpiration, true) + }, + expect: func(t *testing.T, ps Probes) { + assert := assert.New(t) + _, err := ps.AverageRTT() + assert.EqualError(err, "strconv.ParseInt: parsing \"foo\": invalid syntax") + }, + }, } for _, tc := range tests { @@ -833,9 +1070,10 @@ func TestProbes_AverageRTT(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT()) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } @@ -845,7 +1083,7 @@ func TestProbes_dequeue(t *testing.T) { tests := []struct { name string probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) + mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) expect func(t *testing.T, ps Probes) }{ { @@ -853,13 +1091,14 @@ func TestProbes_dequeue(t *testing.T) { probes: []*Probe{ mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { data, err := json.Marshal(ps[0]) if err != nil { t.Fatal(err) } mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(data)) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -898,7 +1137,7 @@ func TestProbes_dequeue(t *testing.T) { {mockHost, 34 * time.Millisecond, time.Now()}, mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { var rawProbes []string for _, p := range ps { data, err := json.Marshal(p) @@ -910,14 +1149,21 @@ func TestProbes_dequeue(t *testing.T) { } mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) + mockCache.GetWithExpiration(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).Return(nil, mockCacheExpiration, false) + mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) + mockCache.Set(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), gomock.Any(), gomock.Any()) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[4])) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) + mockCache.Delete(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)) mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(6) + mockCache.Delete(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)) mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[0])) + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -933,7 +1179,7 @@ func TestProbes_dequeue(t *testing.T) { probes: []*Probe{ mockProbe, }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).RedisNil() }, expect: func(t *testing.T, ps Probes) { @@ -945,8 +1191,9 @@ func TestProbes_dequeue(t *testing.T) { { name: "unmarshal probe error", probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { + mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder, ps []*Probe) { mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal("foo") + mockCache.Delete(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)) }, expect: func(t *testing.T, ps Probes) { assert := assert.New(t) @@ -961,9 +1208,10 @@ func TestProbes_dequeue(t *testing.T) { defer ctl.Finish() rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) + cache := cache.NewMockCache(ctl) + tc.mock(mockRDBClient, cache.EXPECT(), tc.probes) - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) + tc.expect(t, NewProbes(mockNetworkTopologyConfig, rdb, cache, mockSeedHost.ID, mockHost.ID)) mockRDBClient.ClearExpect() }) } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ca364b1c639..2c10459d5b5 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -259,7 +259,8 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err // Initialize network topology service. if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { - s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage) + cache := cache.New(cfg.NetworkTopology.Cache.TTL, cfg.NetworkTopology.Cache.Interval) + s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, cache, resource, s.storage) if err != nil { return nil, err }