diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 7f419bc1dae..dad5799bb32 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -373,8 +373,7 @@ func (t *Task) HasAvailablePeer(blocklist set.SafeSet[string]) bool { continue } - if peer.FSM.Is(PeerStatePending) || - peer.FSM.Is(PeerStateRunning) || + if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateSucceeded) || peer.FSM.Is(PeerStateBackToSource) { hasAvailablePeer = true diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index ec0862fe848..064905c98d3 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -1003,17 +1003,6 @@ func TestTask_HasAvailablePeer(t *testing.T) { assert.Equal(task.HasAvailablePeer(blocklist), false) }, }, - { - name: "peer state is PeerStatePending", - expect: func(t *testing.T, task *Task, mockPeer *Peer) { - assert := assert.New(t) - task.StorePeer(mockPeer) - mockPeer.ID = idgen.PeerIDV2() - mockPeer.FSM.SetState(PeerStatePending) - task.StorePeer(mockPeer) - assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true) - }, - }, { name: "peer state is PeerStateSucceeded", expect: func(t *testing.T, task *Task, mockPeer *Peer) { diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index cb7d4bd58dc..30eecd18cf0 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -504,6 +504,7 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S filterParentLimit = int(config.FilterParentLimit) } } + peer.Log.Debugf("filter parent limit is %d", filterParentLimit) var ( candidateParents []*resource.Peer diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index a1deba165aa..315731560cf 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -817,7 +817,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. - host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload()) + _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload()) if err != nil { return err } @@ -827,29 +827,30 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() - // When there are no available peers for a task, the scheduler needs to trigger - // the first task download in the p2p cluster. blocklist := set.NewSafeSet[string]() blocklist.Add(peer.ID) - if task.FSM.Is(resource.TaskStatePending) || + download := proto.Clone(req.Download).(*commonv2.Download) + switch { + // If scheduler trigger seed peer download back-to-source, + // the needBackToSource flag should be true. + case download.GetNeedBackToSource(): + peer.Log.Infof("peer need back to source") + peer.NeedBackToSource.Store(true) + // If task is pending, failed, leave, or succeeded and has no available peer, + // scheduler trigger seed peer download back-to-source. + case task.FSM.Is(resource.TaskStatePending) || task.FSM.Is(resource.TaskStateFailed) || task.FSM.Is(resource.TaskStateLeave) || task.FSM.Is(resource.TaskStateSucceeded) && - !task.HasAvailablePeer(blocklist) { - download := proto.Clone(req.Download).(*commonv2.Download) - if download.GetNeedBackToSource() || host.Type != types.HostTypeNormal { - peer.Log.Infof("peer need back to source") - peer.NeedBackToSource.Store(true) - } else { - // If trigger the seed peer download back-to-source, - // the need back-to-source flag should be true. - download.NeedBackToSource = true - if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { - // Collect RegisterPeerFailureCount metrics. - metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() - return err - } + !task.HasAvailablePeer(blocklist): + // If trigger the seed peer download back-to-source, + // the need back-to-source flag should be true. + download.NeedBackToSource = true + if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return err } }