From 22f4f9e13bb0b40ea7a006e586aa9f5907661a8f Mon Sep 17 00:00:00 2001 From: dlut_xz <52518280+fcgxz2003@users.noreply.github.com> Date: Thu, 16 Nov 2023 14:01:26 +0800 Subject: [PATCH] feat: implement batch calculation of candidate parents scores (#2853) Signed-off-by: XZ <834756128@qq.com> --- scheduler/scheduling/evaluator/evaluator.go | 4 +- .../scheduling/evaluator/evaluator_base.go | 17 +- .../evaluator/evaluator_base_test.go | 181 +++++++++++++++++- .../scheduling/evaluator/testdata/main.go | 3 +- .../evaluator/testdata/plugin/evaluator.go | 4 +- scheduler/scheduling/scheduling.go | 15 +- scheduler/scheduling/scheduling_test.go | 51 +++++ 7 files changed, 251 insertions(+), 24 deletions(-) diff --git a/scheduler/scheduling/evaluator/evaluator.go b/scheduler/scheduling/evaluator/evaluator.go index eb7aec128b5..a6aad23a731 100644 --- a/scheduler/scheduling/evaluator/evaluator.go +++ b/scheduler/scheduling/evaluator/evaluator.go @@ -32,8 +32,8 @@ const ( ) type Evaluator interface { - // Evaluate todo Normalization. - Evaluate(parent *resource.Peer, child *resource.Peer, taskPieceCount int32) float64 + // EvaluateParents sort parents by evaluating multiple feature scores. + EvaluateParents(parents []*resource.Peer, child *resource.Peer, taskPieceCount int32) []*resource.Peer // IsBadNode determine if peer is a failed node. IsBadNode(peer *resource.Peer) bool diff --git a/scheduler/scheduling/evaluator/evaluator_base.go b/scheduler/scheduling/evaluator/evaluator_base.go index 9016cdb4900..d2a838896b1 100644 --- a/scheduler/scheduling/evaluator/evaluator_base.go +++ b/scheduler/scheduling/evaluator/evaluator_base.go @@ -18,6 +18,7 @@ package evaluator import ( "math/big" + "sort" "strings" "github.com/montanaflynn/stats" @@ -75,8 +76,20 @@ func NewEvaluatorBase() Evaluator { return &evaluatorBase{} } -// The larger the value after evaluation, the higher the priority. -func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 { +// EvaluateParents sort parents by evaluating multiple feature scores. +func (eb *evaluatorBase) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer { + sort.Slice( + parents, + func(i, j int) bool { + return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount) + }, + ) + + return parents +} + +// The larger the value, the higher the priority. +func evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 { parentLocation := parent.Host.Network.Location parentIDC := parent.Host.Network.IDC childLocation := child.Host.Network.Location diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 4dc3c90daed..3a9cbe9dfa8 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -177,7 +177,171 @@ func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) { } } -func TestEvaluatorBase_Evaluate(t *testing.T) { +func TestEvaluatorBase_EvaluateParents(t *testing.T) { + tests := []struct { + name string + parents []*resource.Peer + child *resource.Peer + totalPieceCount int32 + mock func(parent []*resource.Peer, child *resource.Peer) + expect func(t *testing.T, parents []*resource.Peer) + }{ + { + name: "parents is empty", + parents: []*resource.Peer{}, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + totalPieceCount: 1, + mock: func(parent []*resource.Peer, child *resource.Peer) { + }, + expect: func(t *testing.T, parents []*resource.Peer) { + assert := assert.New(t) + assert.Equal(len(parents), 0) + + }, + }, + { + name: "evaluate single parent", + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + }, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + totalPieceCount: 1, + mock: func(parent []*resource.Peer, child *resource.Peer) { + }, + expect: func(t *testing.T, parents []*resource.Peer) { + assert := assert.New(t) + assert.Equal(len(parents), 1) + assert.Equal(parents[0].Task.ID, mockTaskID) + assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID) + + }, + }, + { + name: "evaluate parents with free upload count", + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + }, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + totalPieceCount: 1, + mock: func(parents []*resource.Peer, child *resource.Peer) { + parents[1].Host.ConcurrentUploadCount.Add(4) + parents[2].Host.ConcurrentUploadCount.Add(3) + parents[3].Host.ConcurrentUploadCount.Add(2) + parents[4].Host.ConcurrentUploadCount.Add(1) + }, + expect: func(t *testing.T, parents []*resource.Peer) { + assert := assert.New(t) + assert.Equal(len(parents), 5) + assert.Equal(parents[0].Host.ID, mockRawSeedHost.ID) + assert.Equal(parents[1].Host.ID, "bae") + assert.Equal(parents[2].Host.ID, "bac") + assert.Equal(parents[3].Host.ID, "baz") + assert.Equal(parents[4].Host.ID, "bar") + }, + }, + { + name: "evaluate parents with pieces", + parents: []*resource.Peer{ + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), + }, + child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), + totalPieceCount: 1, + mock: func(parents []*resource.Peer, child *resource.Peer) { + parents[1].FinishedPieces.Set(0) + parents[2].FinishedPieces.Set(0).Set(1) + parents[3].FinishedPieces.Set(0).Set(1).Set(2) + parents[4].FinishedPieces.Set(0).Set(1).Set(2).Set(3) + }, + expect: func(t *testing.T, parents []*resource.Peer) { + assert := assert.New(t) + assert.Equal(len(parents), 5) + assert.Equal(parents[0].Host.ID, "bae") + assert.Equal(parents[1].Host.ID, "bac") + assert.Equal(parents[2].Host.ID, "baz") + assert.Equal(parents[3].Host.ID, "bar") + assert.Equal(parents[4].Host.ID, mockRawSeedHost.ID) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + eb := NewEvaluatorBase() + tc.mock(tc.parents, tc.child) + tc.expect(t, eb.EvaluateParents(tc.parents, tc.child, tc.totalPieceCount)) + }) + } +} + +func TestEvaluatorBase_evaluate(t *testing.T) { tests := []struct { name string parent *resource.Peer @@ -231,9 +395,8 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - eb := NewEvaluatorBase() tc.mock(tc.parent, tc.child) - tc.expect(t, eb.Evaluate(tc.parent, tc.child, tc.totalPieceCount)) + tc.expect(t, evaluate(tc.parent, tc.child, tc.totalPieceCount)) }) } } @@ -435,6 +598,16 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) { assert.Equal(score, float64(1)) }, }, + { + name: "freeUploadCount is empty", + mock: func(host *resource.Host, mockPeer *resource.Peer) { + mockPeer.Host.ConcurrentUploadCount.Add(host.ConcurrentUploadLimit.Load()) + }, + expect: func(t *testing.T, score float64) { + assert := assert.New(t) + assert.Equal(score, float64(0)) + }, + }, } for _, tc := range tests { @@ -685,7 +858,7 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) { { name: "dst and src both exceeds maximum length", dst: "foo|bar|baz|bac|bae|baf", - src: "foo|bar|baz|bac|bae|baf", + src: "foo|bar|baz|bac|bae|bai", expect: func(t *testing.T, score float64) { assert := assert.New(t) assert.Equal(score, float64(1)) diff --git a/scheduler/scheduling/evaluator/testdata/main.go b/scheduler/scheduling/evaluator/testdata/main.go index 536eb1fab84..52a56786506 100644 --- a/scheduler/scheduling/evaluator/testdata/main.go +++ b/scheduler/scheduling/evaluator/testdata/main.go @@ -31,7 +31,8 @@ func main() { os.Exit(1) } - if score := e.Evaluate(&resource.Peer{}, &resource.Peer{}, int32(0)); score != float64(1) { + candidateParents := e.EvaluateParents([]*resource.Peer{&resource.Peer{}}, &resource.Peer{}, int32(0)) + if len(candidateParents) != 1 { fmt.Println("Evaluate failed") os.Exit(1) } diff --git a/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go b/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go index ca1bd587cbc..4172759b9f0 100644 --- a/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go +++ b/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go @@ -20,8 +20,8 @@ import "d7y.io/dragonfly/v2/scheduler/resource" type evaluator struct{} -func (e *evaluator) Evaluate(parent *resource.Peer, child *resource.Peer, taskPieceCount int32) float64 { - return float64(1) +func (e *evaluator) EvaluateParents(parents []*resource.Peer, child *resource.Peer, taskPieceCount int32) []*resource.Peer { + return []*resource.Peer{&resource.Peer{}} } func (e *evaluator) IsBadNode(peer *resource.Peer) bool { diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index cab36f8f931..531eebea834 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -21,7 +21,6 @@ package scheduling import ( "context" "fmt" - "sort" "time" "google.golang.org/grpc/codes" @@ -407,12 +406,7 @@ func (s *scheduling) FindCandidateParents(ctx context.Context, peer *resource.Pe // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() - sort.Slice( - candidateParents, - func(i, j int) bool { - return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount) - }, - ) + candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, taskTotalPieceCount) // Get the parents with candidateParentLimit. candidateParentLimit := config.DefaultSchedulerCandidateParentLimit @@ -461,12 +455,7 @@ func (s *scheduling) FindSuccessParent(ctx context.Context, peer *resource.Peer, // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() - sort.Slice( - successParents, - func(i, j int) bool { - return s.evaluator.Evaluate(successParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(successParents[j], peer, taskTotalPieceCount) - }, - ) + successParents = s.evaluator.EvaluateParents(successParents, peer, taskTotalPieceCount) peer.Log.Infof("scheduling success parent is %s", successParents[0].ID) return successParents[0], true diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index f905efa45ce..8e7aa1aca39 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -773,6 +773,17 @@ func TestScheduling_FindCandidateParents(t *testing.T) { mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) }{ + { + name: "task peers state is failed", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateFailed) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) { + assert := assert.New(t) + assert.Equal(len(parents), 0) + assert.False(ok) + }, + }, { name: "task peers is empty", mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { @@ -1013,6 +1024,35 @@ func TestScheduling_FindCandidateParents(t *testing.T) { assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID, peer.ID}, parents[0].ID) }, }, + { + name: "candidateParents is longer than candidateParentLimit", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + mockPeers[0].FSM.SetState(resource.PeerStateRunning) + mockPeers[1].FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + peer.Task.StorePeer(mockPeers[1]) + peer.Task.BackToSourcePeers.Add(mockPeers[0].ID) + peer.Task.BackToSourcePeers.Add(mockPeers[1].ID) + mockPeers[0].FSM.SetState(resource.PeerStateBackToSource) + mockPeers[1].FSM.SetState(resource.PeerStateBackToSource) + mockPeers[0].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(0) + mockPeers[1].FinishedPieces.Set(1) + mockPeers[1].FinishedPieces.Set(2) + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{ + CandidateParentLimit: 1, + }, nil).Times(2) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) { + assert := assert.New(t) + assert.True(ok) + assert.Equal(len(parents), 1) + assert.Equal(parents[0].ID, mockPeers[1].ID) + }, + }, } for _, tc := range tests { @@ -1050,6 +1090,17 @@ func TestScheduling_FindSuccessParent(t *testing.T) { mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) }{ + { + name: "task peers state is failed", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateFailed) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) { + assert := assert.New(t) + assert.Nil(parent) + assert.False(ok) + }, + }, { name: "task peers is empty", mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {