Skip to content

Commit

Permalink
feat:add cache about networktopology
Browse files Browse the repository at this point in the history
Signed-off-by: huangmin <2107139596@qq.com>
  • Loading branch information
MinH-09 committed Dec 18, 2023
1 parent 5376255 commit aaa9ade
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 456 deletions.
38 changes: 0 additions & 38 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// AverageRTTs loads RTTs of source host id and destination hosts id.
AverageRTTs(string, []string) ([]time.Duration, error)

// Snapshot writes the current network topology to the storage.
Snapshot() error
}
Expand Down Expand Up @@ -296,41 +293,6 @@ func (nt *networkTopology) DeleteHost(hostID string) error {
return nil
}

// AverageRTTs loads RTTs of source host id and destination hosts id.
func (nt *networkTopology) AverageRTTs(srcHostID string, destHostIDs []string) ([]time.Duration, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

var averageRTTs []time.Duration
for _, id := range destHostIDs {
var err error
var networkTopology map[string]string
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, id)
any, ok := nt.cache.Get(networkTopologyKey)
if !ok {
if networkTopology, err = nt.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
averageRTTs = append(averageRTTs, time.Duration(0))
continue
}

if err := nt.cache.Add(networkTopologyKey, networkTopology, nt.config.TTL); err != nil {
logger.Error(err)
}
} else {
networkTopology = any.(map[string]string)
}

averageRTT, err := strconv.ParseInt(networkTopology["averageRTT"], 10, 64)
if err != nil {
averageRTTs = append(averageRTTs, time.Duration(0))
} else {
averageRTTs = append(averageRTTs, time.Duration(averageRTT))
}
}

return averageRTTs, nil
}

// Probes loads probes interface by source host id and destination host id.
func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, nt.cache, srcHostID, destHostID)
Expand Down
104 changes: 0 additions & 104 deletions scheduler/networktopology/network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,110 +822,6 @@ func TestNetworkTopology_Probes(t *testing.T) {
}
}

func TestNetworkTopology_AverageRTTs(t *testing.T) {
tests := []struct {
name string
mock func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder)
expect func(t *testing.T, networkTopology NetworkTopology, err error)
}{
{
name: "get average RTTs with cache",
mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) {
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(mockNetworkTopology, true)
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(nil, false)
mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).SetVal(mockNetworkTopology)
mockCache.Add(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), mockNetworkTopology, gomock.Any()).Return(nil)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID, mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, []time.Duration{30 * time.Millisecond, 30 * time.Millisecond})
},
},
{
name: "get average RTTs",
mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) {
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(nil, false)
mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).SetVal(mockNetworkTopology)
mockCache.Add(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), mockNetworkTopology, gomock.Any()).Return(nil)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, mockAverageRTT)
},
},
{
name: "get average RTTs error",
mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) {
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(nil, false)
mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).SetErr(errors.New("get average RTTs error"))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, []time.Duration{time.Duration(0)})
},
},
{
name: "add cache error",
mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) {
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(nil, false)
mockRDBClient.ExpectHGetAll(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).SetVal(mockNetworkTopology)
mockCache.Add(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), mockNetworkTopology, gomock.Any()).Return(errors.New("add cache error"))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, mockAverageRTT)
},
},
{
name: "parseInt error",
mock: func(mockRDBClient redismock.ClientMock, mockCache *cache.MockCacheMockRecorder) {
mockCache.Get(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID)).Return(map[string]string{"averageRTT": "foo"}, true)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, []time.Duration{0})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()

rdb, mockRDBClient := redismock.NewClientMock()
res := resource.NewMockResource(ctl)
storage := storagemocks.NewMockStorage(ctl)
cache := cache.NewMockCache(ctl)
tc.mock(mockRDBClient, cache.EXPECT())

networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage)
tc.expect(t, networkTopology, err)
mockRDBClient.ClearExpect()
})
}
}

func TestNetworkTopology_ProbedCount(t *testing.T) {
tests := []struct {
name string
Expand Down
11 changes: 1 addition & 10 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/scheduling/evaluator"
"d7y.io/dragonfly/v2/scheduler/storage"
)

Expand Down Expand Up @@ -258,25 +257,17 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
}

// Initialize dial options of scheduling.
schedulingOptions := []scheduling.Option{}
// Initialize network topology service.
if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
cache := pkgcache.New(cfg.NetworkTopology.Expire, pkgcache.NoCleanup)
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, cache, resource, s.storage)
if err != nil {
return nil, err
}

// Initialize dial options of evaluator.
evaluatorOptions := []evaluator.Option{}
evaluatorOptions = append(evaluatorOptions, evaluator.WithNetworkTopology(s.networkTopology))

schedulingOptions = append(schedulingOptions, scheduling.WithEvaluatorOptions(evaluatorOptions))
}

// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), schedulingOptions...)
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir())

// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
Expand Down
6 changes: 3 additions & 3 deletions scheduler/scheduling/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ type Evaluator interface {
IsBadNode(peer *resource.Peer) bool
}

func New(algorithm string, pluginDir string, options ...Option) Evaluator {
func New(algorithm string, pluginDir string) Evaluator {
switch algorithm {
case PluginAlgorithm:
if plugin, err := LoadPlugin(pluginDir); err == nil {
return plugin
}
// TODO Implement MLAlgorithm.
case MLAlgorithm, DefaultAlgorithm:
return NewEvaluatorBase(options...)
return NewEvaluatorBase()
}

return NewEvaluatorBase(options...)
return NewEvaluatorBase()
}
80 changes: 9 additions & 71 deletions scheduler/scheduling/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (
"math/big"
"sort"
"strings"
"time"

"github.com/montanaflynn/stats"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/math"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand All @@ -49,9 +47,6 @@ const (

// Location affinity weight.
locationAffinityWeight = 0.15

// Network topology weight.
networkTopologyWeight = 0.05
)

const (
Expand All @@ -75,51 +70,20 @@ const (
maxElementLen = 5
)

type evaluatorBase struct {
networktopology networktopology.NetworkTopology
}

type Option func(eb *evaluatorBase)

// WithNetworkTopology sets the networkTopology.
func WithNetworkTopology(networktopology networktopology.NetworkTopology) Option {
return func(eb *evaluatorBase) {
eb.networktopology = networktopology
}
}

func NewEvaluatorBase(options ...Option) Evaluator {
eb := &evaluatorBase{}
type evaluatorBase struct{}

for _, opt := range options {
opt(eb)
}
return eb
func NewEvaluatorBase() Evaluator {
return &evaluatorBase{}
}

// EvaluateParents sort parents by evaluating multiple feature scores.
func (eb *evaluatorBase) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer {
if eb.networktopology == nil {
sort.Slice(
parents,
func(i, j int) bool {
return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount)
},
)
} else {
var parentIDs []string
for _, parent := range parents {
parentIDs = append(parentIDs, parent.ID)
}
scoces := eb.calculateNetworkTopologyScore(child.ID, parentIDs)

sort.Slice(
parents,
func(i, j int) bool {
return (evaluate(parents[i], child, totalPieceCount) + networkTopologyWeight*scoces[i]) > (evaluate(parents[j], child, totalPieceCount) + networkTopologyWeight*scoces[j])
},
)
}
sort.Slice(
parents,
func(i, j int) bool {
return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount)
},
)

return parents
}
Expand Down Expand Up @@ -244,32 +208,6 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
return float64(score) / float64(maxElementLen)
}

// calculateNetworkTopologyScore 0.0~1.0 larger and better.
func (eb *evaluatorBase) calculateNetworkTopologyScore(src string, dst []string) []float64 {
averageRTTs, err := eb.networktopology.AverageRTTs(src, dst)
if err != nil {
return []float64{}
}

var MaxRTT time.Duration
for _, RTT := range averageRTTs {
if MaxRTT < RTT {
MaxRTT = RTT
}
}

var scoces []float64
for _, RTT := range averageRTTs {
if RTT == 0 {
scoces = append(scoces, minScore)
} else {
scoces = append(scoces, float64((MaxRTT-RTT))/float64(MaxRTT))
}
}

return scoces
}

func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) ||
peer.FSM.Is(resource.PeerStateReceivedTiny) || peer.FSM.Is(resource.PeerStateReceivedSmall) ||
Expand Down
Loading

0 comments on commit aaa9ade

Please sign in to comment.