From ebe9ddbc52cf190af8cdf43398c35237be571eed Mon Sep 17 00:00:00 2001 From: huangmin <2107139596@qq.com> Date: Wed, 22 Nov 2023 15:45:08 +0800 Subject: [PATCH] feat: evalutor adds calculateNetworkTopologyScore Signed-off-by: huangmin <2107139596@qq.com> --- scheduler/announcer/announcer.go | 2 +- .../mocks/network_topology_mock.go | 34 ++-- scheduler/networktopology/network_topology.go | 29 +++ .../networktopology/network_topology_test.go | 68 ++++++++ scheduler/networktopology/probes_test.go | 1 + scheduler/scheduler.go | 11 +- scheduler/scheduling/evaluator/evaluator.go | 6 +- .../scheduling/evaluator/evaluator_base.go | 76 +++++++- .../evaluator/evaluator_base_test.go | 165 ++++++++++++++++-- .../scheduling/evaluator/evaluator_test.go | 21 +++ scheduler/scheduling/scheduling.go | 25 ++- scheduler/scheduling/scheduling_test.go | 16 ++ 12 files changed, 412 insertions(+), 42 deletions(-) diff --git a/scheduler/announcer/announcer.go b/scheduler/announcer/announcer.go index 73078b68f64..3dfae2ad197 100644 --- a/scheduler/announcer/announcer.go +++ b/scheduler/announcer/announcer.go @@ -66,7 +66,7 @@ func WithTrainerClient(client trainerclient.V1) Option { } // Option is a functional option for configuring the announcer. -type Option func(s *announcer) +type Option func(a *announcer) // New returns a new Announcer interface. func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Storage, options ...Option) (Announcer, error) { diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index 4fafe111617..b38ba731596 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -1,15 +1,12 @@ // Code generated by MockGen. DO NOT EDIT. // Source: network_topology.go -// -// Generated by this command: -// -// mockgen -destination mocks/network_topology_mock.go -source network_topology.go -package mocks -// + // Package mocks is a generated GoMock package. package mocks import ( reflect "reflect" + time "time" networktopology "d7y.io/dragonfly/v2/scheduler/networktopology" resource "d7y.io/dragonfly/v2/scheduler/resource" @@ -39,6 +36,21 @@ func (m *MockNetworkTopology) EXPECT() *MockNetworkTopologyMockRecorder { return m.recorder } +// AverageRTTs mocks base method. +func (m *MockNetworkTopology) AverageRTTs(arg0 string, arg1 []string) ([]time.Duration, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AverageRTTs", arg0, arg1) + ret0, _ := ret[0].([]time.Duration) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AverageRTTs indicates an expected call of AverageRTTs. +func (mr *MockNetworkTopologyMockRecorder) AverageRTTs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AverageRTTs", reflect.TypeOf((*MockNetworkTopology)(nil).AverageRTTs), arg0, arg1) +} + // DeleteHost mocks base method. func (m *MockNetworkTopology) DeleteHost(arg0 string) error { m.ctrl.T.Helper() @@ -48,7 +60,7 @@ func (m *MockNetworkTopology) DeleteHost(arg0 string) error { } // DeleteHost indicates an expected call of DeleteHost. -func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0) } @@ -63,7 +75,7 @@ func (m *MockNetworkTopology) FindProbedHosts(arg0 string) ([]*resource.Host, er } // FindProbedHosts indicates an expected call of FindProbedHosts. -func (mr *MockNetworkTopologyMockRecorder) FindProbedHosts(arg0 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) FindProbedHosts(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHosts", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHosts), arg0) } @@ -77,7 +89,7 @@ func (m *MockNetworkTopology) Has(arg0, arg1 string) bool { } // Has indicates an expected call of Has. -func (mr *MockNetworkTopologyMockRecorder) Has(arg0, arg1 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) Has(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Has", reflect.TypeOf((*MockNetworkTopology)(nil).Has), arg0, arg1) } @@ -92,7 +104,7 @@ func (m *MockNetworkTopology) ProbedCount(arg0 string) (uint64, error) { } // ProbedCount indicates an expected call of ProbedCount. -func (mr *MockNetworkTopologyMockRecorder) ProbedCount(arg0 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) ProbedCount(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProbedCount", reflect.TypeOf((*MockNetworkTopology)(nil).ProbedCount), arg0) } @@ -106,7 +118,7 @@ func (m *MockNetworkTopology) Probes(arg0, arg1 string) networktopology.Probes { } // Probes indicates an expected call of Probes. -func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Probes", reflect.TypeOf((*MockNetworkTopology)(nil).Probes), arg0, arg1) } @@ -158,7 +170,7 @@ func (m *MockNetworkTopology) Store(arg0, arg1 string) error { } // Store indicates an expected call of Store. -func (mr *MockNetworkTopologyMockRecorder) Store(arg0, arg1 any) *gomock.Call { +func (mr *MockNetworkTopologyMockRecorder) Store(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Store", reflect.TypeOf((*MockNetworkTopology)(nil).Store), arg0, arg1) } diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index 2c8a313957d..786a8c6ba10 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -72,6 +72,9 @@ type NetworkTopology interface { // Probes loads probes interface by source host id and destination host id. Probes(string, string) Probes + // AverageRTTs loads RTTs of source host id and destination hosts id. + AverageRTTs(string, []string) ([]time.Duration, error) + // ProbedCount is the number of times the host has been probed. ProbedCount(string) (uint64, error) @@ -264,6 +267,32 @@ func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes { return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID) } +// AverageRTTs loads RTTs of source host id and destination hosts id. +func (nt *networkTopology) AverageRTTs(srcHostID string, destHostIDs []string) ([]time.Duration, error) { + pipeline := nt.rdb.Pipeline() + var cmds []*redis.StringCmd + for _, id := range destHostIDs { + cmds = append(cmds, pipeline.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, id), "averageRTT")) + } + + _, err := pipeline.Exec(context.Background()) + if err != nil { + return nil, err + } + + var averageRTTs []time.Duration + for i := range destHostIDs { + averageRTT, err := strconv.ParseInt(cmds[i].Val(), 10, 64) + if err != nil { + averageRTTs = append(averageRTTs, time.Duration(0)) + } else { + averageRTTs = append(averageRTTs, time.Duration(averageRTT)) + } + } + + return averageRTTs, nil +} + // ProbedCount is the number of times the host has been probed. func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go index 1f4a3bd37c2..1ce7ae13cb9 100644 --- a/scheduler/networktopology/network_topology_test.go +++ b/scheduler/networktopology/network_topology_test.go @@ -715,6 +715,74 @@ func TestNetworkTopology_Probes(t *testing.T) { } } +func TestNetworkTopology_AverageRTTs(t *testing.T) { + tests := []struct { + name string + mock func(mockRDBClient redismock.ClientMock) + expect func(t *testing.T, networkTopology NetworkTopology, err error) + }{ + { + name: "get average RTTs", + mock: func(mockRDBClient redismock.ClientMock) { + mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetVal( + strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + assert := assert.New(t) + assert.NoError(err) + + averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID}) + assert.NoError(err) + assert.EqualValues(averageRTTs, mockAverageRTT) + }, + }, + { + name: "get average RTTs error", + mock: func(mockRDBClient redismock.ClientMock) { + mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetErr(errors.New("get average RTTs error")) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + assert := assert.New(t) + assert.NoError(err) + + _, err = networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID}) + assert.EqualError(err, "get average RTTs error") + }, + }, + { + name: "parseInt error", + mock: func(mockRDBClient redismock.ClientMock) { + mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetVal( + "foo") + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error) { + assert := assert.New(t) + assert.NoError(err) + + averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID}) + assert.NoError(err) + assert.EqualValues(averageRTTs, []time.Duration{0}) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + rdb, mockRDBClient := redismock.NewClientMock() + res := resource.NewMockResource(ctl) + storage := storagemocks.NewMockStorage(ctl) + tc.mock(mockRDBClient) + + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + tc.expect(t, networkTopology, err) + mockRDBClient.ClearExpect() + }) + } +} + func TestNetworkTopology_ProbedCount(t *testing.T) { tests := []struct { name string diff --git a/scheduler/networktopology/probes_test.go b/scheduler/networktopology/probes_test.go index a5ffa074763..99b2062bfbe 100644 --- a/scheduler/networktopology/probes_test.go +++ b/scheduler/networktopology/probes_test.go @@ -158,6 +158,7 @@ var ( }, } + mockAverageRTT = []time.Duration{30 * time.Millisecond} mockProbesCreatedAt = time.Now() mockProbedCount = 10 ) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ca364b1c639..1db622f24e2 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -54,6 +54,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/rpcserver" "d7y.io/dragonfly/v2/scheduler/scheduling" + "d7y.io/dragonfly/v2/scheduler/scheduling/evaluator" "d7y.io/dragonfly/v2/scheduler/storage" ) @@ -257,16 +258,24 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } } + // Initialize dial options of scheduling. + schedulingOptions := []scheduling.Option{} // Initialize network topology service. if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage) if err != nil { return nil, err } + + // Initialize dial options of evaluator. + evaluatorOptions := []evaluator.EvaluatorOptions{} + evaluatorOptions = append(evaluatorOptions, evaluator.WithNetworkTopology(s.networkTopology)) + + schedulingOptions = append(schedulingOptions, scheduling.WithEvaluatorOptions(evaluatorOptions)) } // Initialize scheduling. - scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir()) + scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), schedulingOptions...) // Initialize server options of scheduler grpc server. schedulerServerOptions := []grpc.ServerOption{} diff --git a/scheduler/scheduling/evaluator/evaluator.go b/scheduler/scheduling/evaluator/evaluator.go index a6aad23a731..e3d091d4392 100644 --- a/scheduler/scheduling/evaluator/evaluator.go +++ b/scheduler/scheduling/evaluator/evaluator.go @@ -39,7 +39,7 @@ type Evaluator interface { IsBadNode(peer *resource.Peer) bool } -func New(algorithm string, pluginDir string) Evaluator { +func New(algorithm string, pluginDir string, options ...EvaluatorOptions) Evaluator { switch algorithm { case PluginAlgorithm: if plugin, err := LoadPlugin(pluginDir); err == nil { @@ -47,8 +47,8 @@ func New(algorithm string, pluginDir string) Evaluator { } // TODO Implement MLAlgorithm. case MLAlgorithm, DefaultAlgorithm: - return NewEvaluatorBase() + return NewEvaluatorBase(options...) } - return NewEvaluatorBase() + return NewEvaluatorBase(options...) } diff --git a/scheduler/scheduling/evaluator/evaluator_base.go b/scheduler/scheduling/evaluator/evaluator_base.go index d2a838896b1..9c27aa3b89c 100644 --- a/scheduler/scheduling/evaluator/evaluator_base.go +++ b/scheduler/scheduling/evaluator/evaluator_base.go @@ -20,12 +20,14 @@ import ( "math/big" "sort" "strings" + "time" "github.com/montanaflynn/stats" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/math" "d7y.io/dragonfly/v2/pkg/types" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -47,6 +49,9 @@ const ( // Location affinity weight. locationAffinityWeight = 0.15 + + // Network topology weight. + networkTopologyWeight = 0.05 ) const ( @@ -70,20 +75,51 @@ const ( maxElementLen = 5 ) -type evaluatorBase struct{} +type evaluatorBase struct { + networktopology networktopology.NetworkTopology +} + +type EvaluatorOptions func(eb *evaluatorBase) + +// WithNetworkTopology sets the networkTopology. +func WithNetworkTopology(networktopology networktopology.NetworkTopology) EvaluatorOptions { + return func(eb *evaluatorBase) { + eb.networktopology = networktopology + } +} + +func NewEvaluatorBase(options ...EvaluatorOptions) Evaluator { + eb := &evaluatorBase{} -func NewEvaluatorBase() Evaluator { - return &evaluatorBase{} + for _, opt := range options { + opt(eb) + } + return eb } // EvaluateParents sort parents by evaluating multiple feature scores. func (eb *evaluatorBase) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer { - sort.Slice( - parents, - func(i, j int) bool { - return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount) - }, - ) + if eb.networktopology == nil { + sort.Slice( + parents, + func(i, j int) bool { + return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount) + }, + ) + } else { + var parentIDs []string + for _, parent := range parents { + parentIDs = append(parentIDs, parent.ID) + } + scoces := eb.calculateNetworkTopologyScore(child.ID, parentIDs) + + sort.Slice( + parents, + func(i, j int) bool { + return (evaluate(parents[i], child, totalPieceCount) + networkTopologyWeight*scoces[i]) > (evaluate(parents[j], child, totalPieceCount) + networkTopologyWeight*scoces[j]) + }, + ) + } return parents } @@ -208,6 +244,28 @@ func calculateMultiElementAffinityScore(dst, src string) float64 { return float64(score) / float64(maxElementLen) } +// calculateNetworkTopologyScore 0.0~1.0 larger and better. +func (eb *evaluatorBase) calculateNetworkTopologyScore(src string, dst []string) []float64 { + AverageRTTs, err := eb.networktopology.AverageRTTs(src, dst) + if err != nil { + return []float64{} + } + + var MaxRTT time.Duration + for _, RTT := range AverageRTTs { + if MaxRTT < RTT { + MaxRTT = RTT + } + } + + var scoces []float64 + for _, RTT := range AverageRTTs { + scoces = append(scoces, float64((MaxRTT-RTT))/float64(MaxRTT)) + } + + return scoces +} + func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool { if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) || peer.FSM.Is(resource.PeerStateReceivedTiny) || peer.FSM.Is(resource.PeerStateReceivedSmall) || diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 3a9cbe9dfa8..686f39c4e2f 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/atomic" + "go.uber.org/mock/gomock" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" @@ -30,6 +31,7 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -157,12 +159,25 @@ var ( ) func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl) tests := []struct { name string + option []EvaluatorOptions expect func(t *testing.T, e any) }{ { - name: "new evaluator commonv1", + name: "new evaluator commonv1", + option: []EvaluatorOptions{}, + expect: func(t *testing.T, e any) { + assert := assert.New(t) + assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") + }, + }, + { + name: "new evaluator commonv1 with options", + option: []EvaluatorOptions{WithNetworkTopology(mockNetworkTopology)}, expect: func(t *testing.T, e any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") @@ -172,22 +187,27 @@ func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, NewEvaluatorBase()) + tc.expect(t, NewEvaluatorBase(tc.option...)) }) } } func TestEvaluatorBase_EvaluateParents(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl) tests := []struct { name string + option []EvaluatorOptions parents []*resource.Peer child *resource.Peer totalPieceCount int32 - mock func(parent []*resource.Peer, child *resource.Peer) + mock func(parent []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) expect func(t *testing.T, parents []*resource.Peer) }{ { name: "parents is empty", + option: []EvaluatorOptions{}, parents: []*resource.Peer{}, child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), @@ -195,16 +215,16 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, - mock: func(parent []*resource.Peer, child *resource.Peer) { + mock: func(parent []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { }, expect: func(t *testing.T, parents []*resource.Peer) { assert := assert.New(t) assert.Equal(len(parents), 0) - }, }, { - name: "evaluate single parent", + name: "evaluate single parent", + option: []EvaluatorOptions{}, parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), @@ -218,18 +238,18 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, - mock: func(parent []*resource.Peer, child *resource.Peer) { + mock: func(parent []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { }, expect: func(t *testing.T, parents []*resource.Peer) { assert := assert.New(t) assert.Equal(len(parents), 1) assert.Equal(parents[0].Task.ID, mockTaskID) assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID) - }, }, { - name: "evaluate parents with free upload count", + name: "evaluate parents with free upload count", + option: []EvaluatorOptions{}, parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), @@ -263,7 +283,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, - mock: func(parents []*resource.Peer, child *resource.Peer) { + mock: func(parents []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { parents[1].Host.ConcurrentUploadCount.Add(4) parents[2].Host.ConcurrentUploadCount.Add(3) parents[3].Host.ConcurrentUploadCount.Add(2) @@ -280,7 +300,8 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { }, }, { - name: "evaluate parents with pieces", + name: "evaluate single parent with free upload count and networktopology", + option: []EvaluatorOptions{WithNetworkTopology(mockNetworkTopology)}, parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), @@ -314,7 +335,60 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, - mock: func(parents []*resource.Peer, child *resource.Peer) { + mock: func(parents []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { + parents[1].Host.ConcurrentUploadCount.Add(4) + parents[2].Host.ConcurrentUploadCount.Add(3) + parents[3].Host.ConcurrentUploadCount.Add(2) + parents[4].Host.ConcurrentUploadCount.Add(1) + mn.AverageRTTs(child.ID, []string{parents[0].ID, parents[1].ID, parents[2].ID, parents[3].ID, parents[4].ID}).Return([]time.Duration{1 * time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond, 4 * time.Millisecond, 5 * time.Millisecond}, nil) + }, + expect: func(t *testing.T, parents []*resource.Peer) { + assert := assert.New(t) + assert.Equal(len(parents), 5) + assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID) + assert.Equal(parents[1].Host.ID, "bae") + assert.Equal(parents[2].Host.ID, "bac") + assert.Equal(parents[3].Host.ID, "baz") + assert.Equal(parents[4].Host.ID, "bar") + }, + }, + { + name: "evaluate parents with pieces", + option: []EvaluatorOptions{}, + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + }, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + totalPieceCount: 1, + mock: func(parents []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { parents[1].FinishedPieces.Set(0) parents[2].FinishedPieces.Set(0).Set(1) parents[3].FinishedPieces.Set(0).Set(1).Set(2) @@ -334,8 +408,8 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - eb := NewEvaluatorBase() - tc.mock(tc.parents, tc.child) + eb := NewEvaluatorBase(tc.option...) + tc.mock(tc.parents, tc.child, mockNetworkTopology.EXPECT()) tc.expect(t, eb.EvaluateParents(tc.parents, tc.child, tc.totalPieceCount)) }) } @@ -873,6 +947,69 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) { } } +func TestEvaluatorBase_calculateNetworkTopologyScore(t *testing.T) { + tests := []struct { + name string + parents []*resource.Peer + child *resource.Peer + mock func(parent []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) + expect func(t *testing.T, parent []*resource.Peer, child *resource.Peer, eb *evaluatorBase) + }{ + { + name: "calculate single parent", + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost(mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type))}, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost(mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + mock: func(parents []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { + mn.AverageRTTs(child.ID, []string{parents[0].ID}).Return([]time.Duration{1 * time.Millisecond}, nil) + }, + expect: func(t *testing.T, parent []*resource.Peer, child *resource.Peer, eb *evaluatorBase) { + assert := assert.New(t) + assert.Equal(eb.calculateNetworkTopologyScore(child.ID, []string{parent[0].ID}), []float64{0}) + }, + }, + { + name: "calculate parents", + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost(mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost(mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type))}, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost(mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + mock: func(parents []*resource.Peer, child *resource.Peer, mn *networktopologymocks.MockNetworkTopologyMockRecorder) { + mn.AverageRTTs(child.ID, []string{parents[0].ID, parents[1].ID}).Return([]time.Duration{1 * time.Millisecond, 2 * time.Millisecond}, nil) + }, + expect: func(t *testing.T, parent []*resource.Peer, child *resource.Peer, eb *evaluatorBase) { + assert := assert.New(t) + assert.Equal(eb.calculateNetworkTopologyScore(child.ID, []string{parent[0].ID, parent[1].ID}), []float64{0.5, 0}) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + tc.mock(tc.parents, tc.child, mockNetworkTopology.EXPECT()) + tc.expect(t, tc.parents, tc.child, NewEvaluatorBase(WithNetworkTopology(mockNetworkTopology)).(*evaluatorBase)) + }) + } +} + func TestEvaluatorBase_IsBadNode(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, diff --git a/scheduler/scheduling/evaluator/evaluator_test.go b/scheduler/scheduling/evaluator/evaluator_test.go index dececc6a197..0762ecb651c 100644 --- a/scheduler/scheduling/evaluator/evaluator_test.go +++ b/scheduler/scheduling/evaluator/evaluator_test.go @@ -21,18 +21,26 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" ) func TestEvaluator_New(t *testing.T) { pluginDir := "." + ctl := gomock.NewController(t) + defer ctl.Finish() + mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl) tests := []struct { name string algorithm string + option []EvaluatorOptions expect func(t *testing.T, e any) }{ { name: "new evaluator with default algorithm", algorithm: "default", + option: []EvaluatorOptions{}, expect: func(t *testing.T, e any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") @@ -41,6 +49,7 @@ func TestEvaluator_New(t *testing.T) { { name: "new evaluator with machine learning algorithm", algorithm: "ml", + option: []EvaluatorOptions{}, expect: func(t *testing.T, e any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") @@ -49,6 +58,7 @@ func TestEvaluator_New(t *testing.T) { { name: "new evaluator with plugin", algorithm: "plugin", + option: []EvaluatorOptions{}, expect: func(t *testing.T, e any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") @@ -57,6 +67,16 @@ func TestEvaluator_New(t *testing.T) { { name: "new evaluator with empty string", algorithm: "", + option: []EvaluatorOptions{}, + expect: func(t *testing.T, e any) { + assert := assert.New(t) + assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") + }, + }, + { + name: "new evaluator with default algorithm and networkTopology", + algorithm: "default", + option: []EvaluatorOptions{WithNetworkTopology(mockNetworkTopology)}, expect: func(t *testing.T, e any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase") @@ -67,6 +87,7 @@ func TestEvaluator_New(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { tc.expect(t, New(tc.algorithm, pluginDir)) + tc.expect(t, New(tc.algorithm, pluginDir, tc.option...)) }) } } diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 4d9390d139d..c2396abf409 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -65,14 +65,33 @@ type scheduling struct { // Scheduler dynamic configuration. dynconfig config.DynconfigInterface + + // evaluatorOptions is options of evalutor. + evaluatorOptions []evaluator.EvaluatorOptions +} + +// Option is a functional option for configuring the scheduling. +type Option func(s *scheduling) + +// WithEvaluatorOptions set the options of evaluator. +func WithEvaluatorOptions(opts []evaluator.EvaluatorOptions) Option { + return func(s *scheduling) { + s.evaluatorOptions = opts + } } -func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string) Scheduling { - return &scheduling{ - evaluator: evaluator.New(cfg.Algorithm, pluginDir), +func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string, opts ...Option) Scheduling { + s := &scheduling{ config: cfg, dynconfig: dynconfig, } + + for _, opt := range opts { + opt(s) + } + s.evaluator = evaluator.New(cfg.Algorithm, pluginDir, s.evaluatorOptions...) + + return s } // ScheduleCandidateParents schedules candidate parents to the normal peer. diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index d54ac1853e1..d072bc66307 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -48,6 +48,7 @@ import ( pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling/evaluator" ) @@ -197,14 +198,19 @@ var ( ) func TestScheduling_New(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl) tests := []struct { name string pluginDir string + option []Option expect func(t *testing.T, s any) }{ { name: "new scheduling", pluginDir: "bar", + option: []Option{}, expect: func(t *testing.T, s any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(s).Elem().Name(), "scheduling") @@ -213,6 +219,16 @@ func TestScheduling_New(t *testing.T) { { name: "new scheduling with empty pluginDir", pluginDir: "", + option: []Option{}, + expect: func(t *testing.T, s any) { + assert := assert.New(t) + assert.Equal(reflect.TypeOf(s).Elem().Name(), "scheduling") + }, + }, + { + name: "new scheduling with evaluatorOptions", + pluginDir: "", + option: []Option{WithEvaluatorOptions([]evaluator.EvaluatorOptions{evaluator.WithNetworkTopology(mockNetworkTopology)})}, expect: func(t *testing.T, s any) { assert := assert.New(t) assert.Equal(reflect.TypeOf(s).Elem().Name(), "scheduling")