Skip to content

Commit

Permalink
feat: optimize parameters in seed peer DownloadTask
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Dec 18, 2023
1 parent 9735bcc commit 2e7484e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
5 changes: 2 additions & 3 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/common"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
)
Expand All @@ -47,7 +46,7 @@ const (
type SeedPeer interface {
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
DownloadTask(context.Context, *Task, types.HostType) error
DownloadTask(context.Context, *commonv2.Download) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -88,7 +87,7 @@ func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager
// TODO Implement DownloadTask
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error {
func (s *seedPeer) DownloadTask(ctx context.Context, download *commonv2.Download) error {
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
// defer cancel()

Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if task.FSM.Is(resource.TaskStateFailed) || !task.HasAvailablePeer(blocklist) {
if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil {
if err := v.downloadTaskBySeedPeer(ctx, req.GetDownload(), peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Download, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
Expand All @@ -1312,7 +1312,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1325,7 +1325,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1338,7 +1338,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand Down

0 comments on commit 2e7484e

Please sign in to comment.