Skip to content

Commit

Permalink
feat: implement batch calculation of candidate parents scores (#2853)
Browse files Browse the repository at this point in the history
Signed-off-by: XZ <834756128@qq.com>
  • Loading branch information
fcgxz2003 authored Nov 16, 2023
1 parent 9608eff commit 22f4f9e
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 24 deletions.
4 changes: 2 additions & 2 deletions scheduler/scheduling/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
)

type Evaluator interface {
// Evaluate todo Normalization.
Evaluate(parent *resource.Peer, child *resource.Peer, taskPieceCount int32) float64
// EvaluateParents sort parents by evaluating multiple feature scores.
EvaluateParents(parents []*resource.Peer, child *resource.Peer, taskPieceCount int32) []*resource.Peer

// IsBadNode determine if peer is a failed node.
IsBadNode(peer *resource.Peer) bool
Expand Down
17 changes: 15 additions & 2 deletions scheduler/scheduling/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package evaluator

import (
"math/big"
"sort"
"strings"

"github.com/montanaflynn/stats"
Expand Down Expand Up @@ -75,8 +76,20 @@ func NewEvaluatorBase() Evaluator {
return &evaluatorBase{}
}

// The larger the value after evaluation, the higher the priority.
func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 {
// EvaluateParents sort parents by evaluating multiple feature scores.
func (eb *evaluatorBase) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer {
sort.Slice(
parents,
func(i, j int) bool {
return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount)
},
)

return parents
}

// The larger the value, the higher the priority.
func evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 {
parentLocation := parent.Host.Network.Location
parentIDC := parent.Host.Network.IDC
childLocation := child.Host.Network.Location
Expand Down
181 changes: 177 additions & 4 deletions scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,171 @@ func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) {
}
}

func TestEvaluatorBase_Evaluate(t *testing.T) {
func TestEvaluatorBase_EvaluateParents(t *testing.T) {
tests := []struct {
name string
parents []*resource.Peer
child *resource.Peer
totalPieceCount int32
mock func(parent []*resource.Peer, child *resource.Peer)
expect func(t *testing.T, parents []*resource.Peer)
}{
{
name: "parents is empty",
parents: []*resource.Peer{},
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent []*resource.Peer, child *resource.Peer) {
},
expect: func(t *testing.T, parents []*resource.Peer) {
assert := assert.New(t)
assert.Equal(len(parents), 0)

},
},
{
name: "evaluate single parent",
parents: []*resource.Peer{
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
},
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent []*resource.Peer, child *resource.Peer) {
},
expect: func(t *testing.T, parents []*resource.Peer) {
assert := assert.New(t)
assert.Equal(len(parents), 1)
assert.Equal(parents[0].Task.ID, mockTaskID)
assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID)

},
},
{
name: "evaluate parents with free upload count",
parents: []*resource.Peer{
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
},
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parents []*resource.Peer, child *resource.Peer) {
parents[1].Host.ConcurrentUploadCount.Add(4)
parents[2].Host.ConcurrentUploadCount.Add(3)
parents[3].Host.ConcurrentUploadCount.Add(2)
parents[4].Host.ConcurrentUploadCount.Add(1)
},
expect: func(t *testing.T, parents []*resource.Peer) {
assert := assert.New(t)
assert.Equal(len(parents), 5)
assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID)
assert.Equal(parents[1].Host.ID, "bae")
assert.Equal(parents[2].Host.ID, "bac")
assert.Equal(parents[3].Host.ID, "baz")
assert.Equal(parents[4].Host.ID, "bar")
},
},
{
name: "evaluate parents with pieces",
parents: []*resource.Peer{
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
"bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
},
child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig,
resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parents []*resource.Peer, child *resource.Peer) {
parents[1].FinishedPieces.Set(0)
parents[2].FinishedPieces.Set(0).Set(1)
parents[3].FinishedPieces.Set(0).Set(1).Set(2)
parents[4].FinishedPieces.Set(0).Set(1).Set(2).Set(3)
},
expect: func(t *testing.T, parents []*resource.Peer) {
assert := assert.New(t)
assert.Equal(len(parents), 5)
assert.Equal(parents[0].Host.ID, "bae")
assert.Equal(parents[1].Host.ID, "bac")
assert.Equal(parents[2].Host.ID, "baz")
assert.Equal(parents[3].Host.ID, "bar")
assert.Equal(parents[4].Host.ID, mockRawSeedHost.ID)
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
eb := NewEvaluatorBase()
tc.mock(tc.parents, tc.child)
tc.expect(t, eb.EvaluateParents(tc.parents, tc.child, tc.totalPieceCount))
})
}
}

func TestEvaluatorBase_evaluate(t *testing.T) {
tests := []struct {
name string
parent *resource.Peer
Expand Down Expand Up @@ -231,9 +395,8 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
eb := NewEvaluatorBase()
tc.mock(tc.parent, tc.child)
tc.expect(t, eb.Evaluate(tc.parent, tc.child, tc.totalPieceCount))
tc.expect(t, evaluate(tc.parent, tc.child, tc.totalPieceCount))
})
}
}
Expand Down Expand Up @@ -435,6 +598,16 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) {
assert.Equal(score, float64(1))
},
},
{
name: "freeUploadCount is empty",
mock: func(host *resource.Host, mockPeer *resource.Peer) {
mockPeer.Host.ConcurrentUploadCount.Add(host.ConcurrentUploadLimit.Load())
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(0))
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -685,7 +858,7 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) {
{
name: "dst and src both exceeds maximum length",
dst: "foo|bar|baz|bac|bae|baf",
src: "foo|bar|baz|bac|bae|baf",
src: "foo|bar|baz|bac|bae|bai",
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
Expand Down
3 changes: 2 additions & 1 deletion scheduler/scheduling/evaluator/testdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func main() {
os.Exit(1)
}

if score := e.Evaluate(&resource.Peer{}, &resource.Peer{}, int32(0)); score != float64(1) {
candidateParents := e.EvaluateParents([]*resource.Peer{&resource.Peer{}}, &resource.Peer{}, int32(0))
if len(candidateParents) != 1 {
fmt.Println("Evaluate failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduling/evaluator/testdata/plugin/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import "d7y.io/dragonfly/v2/scheduler/resource"

type evaluator struct{}

func (e *evaluator) Evaluate(parent *resource.Peer, child *resource.Peer, taskPieceCount int32) float64 {
return float64(1)
func (e *evaluator) EvaluateParents(parents []*resource.Peer, child *resource.Peer, taskPieceCount int32) []*resource.Peer {
return []*resource.Peer{&resource.Peer{}}
}

func (e *evaluator) IsBadNode(peer *resource.Peer) bool {
Expand Down
15 changes: 2 additions & 13 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package scheduling
import (
"context"
"fmt"
"sort"
"time"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -407,12 +406,7 @@ func (s *scheduling) FindCandidateParents(ctx context.Context, peer *resource.Pe

// Sort candidate parents by evaluation score.
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
sort.Slice(
candidateParents,
func(i, j int) bool {
return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount)
},
)
candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, taskTotalPieceCount)

// Get the parents with candidateParentLimit.
candidateParentLimit := config.DefaultSchedulerCandidateParentLimit
Expand Down Expand Up @@ -461,12 +455,7 @@ func (s *scheduling) FindSuccessParent(ctx context.Context, peer *resource.Peer,

// Sort candidate parents by evaluation score.
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
sort.Slice(
successParents,
func(i, j int) bool {
return s.evaluator.Evaluate(successParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(successParents[j], peer, taskTotalPieceCount)
},
)
successParents = s.evaluator.EvaluateParents(successParents, peer, taskTotalPieceCount)

peer.Log.Infof("scheduling success parent is %s", successParents[0].ID)
return successParents[0], true
Expand Down
51 changes: 51 additions & 0 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,17 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool)
}{
{
name: "task peers state is failed",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateFailed)
},
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) {
assert := assert.New(t)
assert.Equal(len(parents), 0)
assert.False(ok)
},
},
{
name: "task peers is empty",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
Expand Down Expand Up @@ -1013,6 +1024,35 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID, peer.ID}, parents[0].ID)
},
},
{
name: "candidateParents is longer than candidateParentLimit",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
peer.Task.BackToSourcePeers.Add(mockPeers[0].ID)
peer.Task.BackToSourcePeers.Add(mockPeers[1].ID)
mockPeers[0].FSM.SetState(resource.PeerStateBackToSource)
mockPeers[1].FSM.SetState(resource.PeerStateBackToSource)
mockPeers[0].FinishedPieces.Set(0)
mockPeers[1].FinishedPieces.Set(0)
mockPeers[1].FinishedPieces.Set(1)
mockPeers[1].FinishedPieces.Set(2)

md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{
CandidateParentLimit: 1,
}, nil).Times(2)
},
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) {
assert := assert.New(t)
assert.True(ok)
assert.Equal(len(parents), 1)
assert.Equal(parents[0].ID, mockPeers[1].ID)
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -1050,6 +1090,17 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool)
}{
{
name: "task peers state is failed",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateFailed)
},
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
assert := assert.New(t)
assert.Nil(parent)
assert.False(ok)
},
},
{
name: "task peers is empty",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
Expand Down

0 comments on commit 22f4f9e

Please sign in to comment.