Skip to content

Commit

Permalink
feat:add cache about networktopology
Browse files Browse the repository at this point in the history
Signed-off-by: huangmin <2107139596@qq.com>
  • Loading branch information
MinH-09 committed Dec 26, 2023
1 parent 343d9b2 commit 505031b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 61 deletions.
91 changes: 49 additions & 42 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,18 @@ func (nt *networkTopology) Has(srcHostID string, destHostID string) bool {
return true
}

if networkTopology, err := nt.rdb.HGetAll(ctx, networkTopologyKey).Result(); err == nil && len(networkTopology) != 0 {
if err := nt.cache.Add(networkTopologyKey, networkTopology, nt.config.Cache.TTL); err != nil {
logger.Error(err)
}
networkTopology, err := nt.rdb.HGetAll(ctx, networkTopologyKey).Result()
if err != nil {
logger.Errorf("get %s error because of %s", networkTopologyKey, err.Error())
return false
}

if len(networkTopology) == 0 {
return false
}

if err := nt.cache.Add(networkTopologyKey, networkTopology, nt.config.Cache.TTL); err != nil {
logger.Errorf("add %s cache error because of %s", networkTopologyKey, err.Error())
return true
}

Expand Down Expand Up @@ -181,74 +188,74 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err

blocklist := set.NewSafeSet[string]()
blocklist.Add(hostID)
candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist)
if len(candidateHosts) == 0 {
hosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist)
if len(hosts) == 0 {
return nil, errors.New("probed hosts not found")
}

if len(candidateHosts) <= nt.config.Probe.Count {
return candidateHosts, nil
if len(hosts) <= nt.config.Probe.Count {
return hosts, nil
}

var probedCounts []uint64
var probedCountKeys []string
var hosts []*resource.Host
var filterHosts []*resource.Host
for _, candidateHost := range candidateHosts {
probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID)
any, ok := nt.cache.Get(probedCountKey)
var (
probedCounts []uint64
probedCoundKeys []string
cacheMissHosts []*resource.Host
candidateHosts []*resource.Host
)
for _, host := range hosts {
probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(host.ID)
cache, ok := nt.cache.Get(probedCountKey)
if !ok {
filterHosts = append(filterHosts, candidateHost)
probedCountKeys = append(probedCountKeys, probedCountKey)
cacheMissHosts = append(cacheMissHosts, host)
probedCoundKeys = append(probedCoundKeys, probedCountKey)
} else {
hosts = append(hosts, candidateHost)
probedCounts = append(probedCounts, any.(uint64))
candidateHosts = append(candidateHosts, host)
probedCounts = append(probedCounts, cache.(uint64))
}
}

rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result()
rawProbedCounts, err := nt.rdb.MGet(ctx, probedCoundKeys...).Result()
if err != nil {
return nil, err
}

// Filter invalid probed count. If probed key not exist, the probed count is nil.
for i, rawProbedCount := range rawProbedCounts {
probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(candidateHosts[i].ID)

// Initialize the probedCount value of host in redis when the host is first selected as the candidate probe target.
if rawProbedCount == nil {
if err := nt.rdb.Set(ctx, probedCountKey, 0, 0).Err(); err != nil {
if err := nt.rdb.Set(ctx, probedCoundKeys[i], 0, 0).Err(); err != nil {
return nil, err
}

probedCounts = append(probedCounts, 0)
} else {
value, ok := rawProbedCount.(string)
if !ok {
return nil, errors.New("invalid value type")
}
continue
}

probedCount, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, errors.New("invalid probed count")
}
value, ok := rawProbedCount.(string)
if !ok {
return nil, errors.New("invalid value type")
}

if err := nt.cache.Add(probedCountKey, probedCount, nt.config.Cache.TTL); err != nil {
logger.Error(err)
}
probedCount, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, errors.New("invalid probed count")
}

probedCounts = append(probedCounts, probedCount)
if err := nt.cache.Add(probedCoundKeys[i], probedCount, nt.config.Cache.TTL); err != nil {
logger.Errorf("add %s cache error because of %s", probedCoundKeys[i], err.Error())
}

hosts = append(hosts, filterHosts[i])
probedCounts = append(probedCounts, probedCount)
}
candidateHosts = append(candidateHosts, cacheMissHosts...)

// Sort candidate hosts by probed count.
sort.Slice(hosts, func(i, j int) bool {
sort.Slice(candidateHosts, func(i, j int) bool {
return probedCounts[i] < probedCounts[j]
})

return hosts[:nt.config.Probe.Count], nil
return candidateHosts[:nt.config.Probe.Count], nil
}

// DeleteHost deletes source host and all destination host connected to source host.
Expand Down Expand Up @@ -303,8 +310,8 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
defer cancel()

probedCountKey := pkgredis.MakeProbedCountKeyInScheduler(hostID)
if any, ok := nt.cache.Get(probedCountKey); ok {
return any.(uint64), nil
if cache, ok := nt.cache.Get(probedCountKey); ok {
return cache.(uint64), nil
}

probedCount, err := nt.rdb.Get(ctx, probedCountKey).Uint64()
Expand All @@ -313,7 +320,7 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
}

if err := nt.cache.Add(probedCountKey, probedCount, nt.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", probedCountKey, err.Error())
}

return probedCount, nil
Expand Down
2 changes: 1 addition & 1 deletion scheduler/networktopology/network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestNetworkTopology_Has(t *testing.T) {
run: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.True(networkTopology.Has(mockSeedHost.ID, mockHost.ID))
assert.False(networkTopology.Has(mockSeedHost.ID, mockHost.ID))
},
},
}
Expand Down
45 changes: 27 additions & 18 deletions scheduler/networktopology/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,16 @@ func (p *probes) Peek() (*Probe, error) {
defer cancel()

probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)
if any, ok := p.cache.Get(probesKey); ok {
return any.([]*Probe)[0], nil
if cache, ok := p.cache.Get(probesKey); ok {
return cache.([]*Probe)[0], nil
}

rawProbes, err := p.rdb.LRange(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0, -1).Result()
if err != nil || len(rawProbes) == 0 {
if err != nil {
return nil, err
}

if len(rawProbes) == 0 {
return nil, err
}

Expand All @@ -125,7 +129,7 @@ func (p *probes) Peek() (*Probe, error) {
}

if err := p.cache.Add(probesKey, probes, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", probesKey, err.Error())
}

return probes[0], nil
Expand Down Expand Up @@ -196,7 +200,7 @@ func (p *probes) Enqueue(probe *Probe) error {
}

if err := p.cache.Add(probesKey, probes, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", probesKey, err.Error())
}

// Update the moving average round-trip time and updated time.
Expand Down Expand Up @@ -224,12 +228,17 @@ func (p *probes) Len() (int64, error) {
defer cancel()

probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)
if any, ok := p.cache.Get(probesKey); ok {
return int64(len(any.([]*Probe))), nil
if cache, ok := p.cache.Get(probesKey); ok {
return int64(len(cache.([]*Probe))), nil
}

rawProbes, err := p.rdb.LRange(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0, -1).Result()
if err != nil || len(rawProbes) == 0 {
if err != nil {
logger.Errorf("get probes error because of %s", err.Error())
return int64(0), err
}

if len(rawProbes) == 0 {
return int64(0), err
}

Expand All @@ -243,7 +252,7 @@ func (p *probes) Len() (int64, error) {
}

if err := p.cache.Add(probesKey, probes, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", probesKey, err.Error())
}

return int64(len(probes)), nil
Expand All @@ -256,18 +265,18 @@ func (p *probes) CreatedAt() (time.Time, error) {

var networkTopology map[string]string
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
any, ok := p.cache.Get(networkTopologyKey)
cache, ok := p.cache.Get(networkTopologyKey)
if !ok {
var err error
if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
return time.Time{}, err
}

if err := p.cache.Add(networkTopologyKey, networkTopology, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", networkTopologyKey, err.Error())
}
} else {
networkTopology = any.(map[string]string)
networkTopology = cache.(map[string]string)
}

createdTime, err := time.Parse(time.RFC3339Nano, networkTopology["createdAt"])
Expand All @@ -285,18 +294,18 @@ func (p *probes) UpdatedAt() (time.Time, error) {

var networkTopology map[string]string
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
any, ok := p.cache.Get(networkTopologyKey)
cache, ok := p.cache.Get(networkTopologyKey)
if !ok {
var err error
if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
return time.Time{}, err
}

if err := p.cache.Add(networkTopologyKey, networkTopology, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", networkTopologyKey, err.Error())
}
} else {
networkTopology = any.(map[string]string)
networkTopology = cache.(map[string]string)
}

updatedTime, err := time.Parse(time.RFC3339Nano, networkTopology["updatedAt"])
Expand All @@ -314,18 +323,18 @@ func (p *probes) AverageRTT() (time.Duration, error) {

var networkTopology map[string]string
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
any, ok := p.cache.Get(networkTopologyKey)
cache, ok := p.cache.Get(networkTopologyKey)
if !ok {
var err error
if networkTopology, err = p.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
return time.Duration(0), err
}

if err := p.cache.Add(networkTopologyKey, networkTopology, p.config.Cache.TTL); err != nil {
logger.Error(err)
logger.Errorf("add %s cache error because of %s", networkTopologyKey, err.Error())
}
} else {
networkTopology = any.(map[string]string)
networkTopology = cache.(map[string]string)
}

averageRTT, err := strconv.ParseInt(networkTopology["averageRTT"], 10, 64)
Expand Down

0 comments on commit 505031b

Please sign in to comment.