Skip to content

Commit

Permalink
feat: trigger download by task id (#2970)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Dec 25, 2023
1 parent 86ae8ce commit 6b93ef8
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 56 deletions.
10 changes: 5 additions & 5 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type V2 interface {
DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error)

// TriggerDownloadTask triggers download task from the other peer.
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) error
TriggerDownloadTask(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
Expand All @@ -106,7 +106,7 @@ func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.SyncPiecesRequest,
defer cancel()

return v.DfdaemonUploadClient.SyncPieces(
ctx,
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId),
req,
opts...,
)
Expand All @@ -118,19 +118,19 @@ func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceReq
defer cancel()

return v.DfdaemonUploadClient.DownloadPiece(
ctx,
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId),
req,
opts...,
)
}

// TriggerDownloadTask triggers download task from the other peer.
func (v *v2) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) error {
func (v *v2) TriggerDownloadTask(ctx context.Context, taskID string, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.TriggerDownloadTask(
ctx,
context.WithValue(ctx, pkgbalancer.ContextKey, taskID),
req,
opts...,
)
Expand Down
10 changes: 5 additions & 5 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
type SeedPeer interface {
// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) error
TriggerDownloadTask(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -89,11 +89,11 @@ func newSeedPeer(cfg *config.Config, client SeedPeerClient, peerManager PeerMana

// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
func (s *seedPeer) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest) error {
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
func (s *seedPeer) TriggerDownloadTask(ctx context.Context, taskID string, req *dfdaemonv2.TriggerDownloadTaskRequest) error {
ctx, cancel := context.WithCancel(trace.ContextWithSpan(ctx, trace.SpanFromContext(ctx)))
defer cancel()

return s.client.TriggerDownloadTask(ctx, req)
return s.client.TriggerDownloadTask(ctx, taskID, req)
}

// TriggerTask triggers the seed peer to download task.
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions scheduler/resource/seed_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
{
name: "trigger download task failed",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1)
mc.TriggerDownloadTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
Expand All @@ -76,7 +76,7 @@ func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
{
name: "trigger download task scuccess",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(nil).Times(1)
mc.TriggerDownloadTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
Expand All @@ -95,7 +95,7 @@ func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
tc.mock(client.EXPECT())

seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager)
tc.expect(t, seedPeer.TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{}))
tc.expect(t, seedPeer.TriggerDownloadTask(context.Background(), mockTaskID, &dfdaemonv2.TriggerDownloadTaskRequest{}))
})
}
}
Expand Down
25 changes: 14 additions & 11 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if task.FSM.Is(resource.TaskStateFailed) || !task.HasAvailablePeer(blocklist) {
if err := v.downloadTaskBySeedPeer(ctx, req.GetDownload(), peer); err != nil {
if err := v.downloadTaskBySeedPeer(ctx, taskID, req.GetDownload(), peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
Expand Down Expand Up @@ -1305,7 +1305,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Download, peer *resource.Peer) error {
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download *commonv2.Download, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
Expand All @@ -1314,14 +1314,15 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)
}(ctx, taskID, download, types.HostTypeSuperSeed)

break
}
Expand All @@ -1330,14 +1331,15 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)
}(ctx, taskID, download, types.HostTypeSuperSeed)

break
}
Expand All @@ -1346,14 +1348,15 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)
}(ctx, taskID, download, types.HostTypeSuperSeed)

break
}
Expand Down
Loading

0 comments on commit 6b93ef8

Please sign in to comment.