Skip to content

Commit

Permalink
feat: handle the error for scheduling persistent cache task (#3781)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 21, 2025
1 parent c4f4abb commit 9320a7f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.10
d7y.io/api/v2 v2.1.12
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.10 h1:Dgc7a6OXIMPb7K7RGiQStv//Lpm0YUAp19ITE6Gkouo=
d7y.io/api/v2 v2.1.10/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.12 h1:jFo4TA6sRVSbcjPlFrig8S+7P37pww4bFbeTxcGhd54=
d7y.io/api/v2 v2.1.12/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
41 changes: 27 additions & 14 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,43 +126,51 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
return err
}
case *schedulerv2.AnnouncePeerRequest_ReschedulePeerRequest:
rescheduleRequest := announcePeerRequest.ReschedulePeerRequest

log.Infof("receive RescheduleRequest description: %s", rescheduleRequest.GetDescription())
if err := v.handleRescheduleRequest(ctx, req.GetPeerId(), rescheduleRequest.GetCandidateParents()); err != nil {
reschedulePeerRequest := announcePeerRequest.ReschedulePeerRequest
log.Infof("receive ReschedulePeerRequestescription: %s", reschedulePeerRequest.GetDescription())
if err := v.handleReschedulePeerRequest(ctx, req.GetPeerId(), reschedulePeerRequest.GetCandidateParents()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
if err := v.handleDownloadPeerFinishedRequest(context.Background(), req.GetPeerId()); err != nil {
if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
log.Error(err)
return err
}

// If the task is succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
if err := v.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
log.Error(err)
return err
}

// If the task is back-to-source succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
if err := v.handleDownloadPeerFailedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
}

// If the task is failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
if err := v.handleDownloadPeerBackToSourceFailedRequest(context.Background(), req.GetPeerId()); err != nil {
if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.GetPeerId()); err != nil {
log.Error(err)
return err
}

// If the task is back-to-source failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
piece := announcePeerRequest.DownloadPieceFinishedRequest.Piece
log.Infof("receive DownloadPieceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
Expand Down Expand Up @@ -1157,8 +1165,8 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
return nil
}

// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
func (v *V2) handleRescheduleRequest(_ context.Context, peerID string, candidateParents []*commonv2.Peer) error {
// handleReschedulePeerRequest handles ReschedulePeerRequest of AnnouncePeerRequest.
func (v *V2) handleReschedulePeerRequest(_ context.Context, peerID string, candidateParents []*commonv2.Peer) error {
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID)
Expand Down Expand Up @@ -1619,7 +1627,6 @@ func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePe
}
case *schedulerv2.AnnouncePersistentCachePeerRequest_ReschedulePersistentCachePeerRequest:
reschedulePersistentCachePeerRequest := announcePersistentCachePeerRequest.ReschedulePersistentCachePeerRequest

log.Info("receive ReschedulePersistentCachePeerRequest")
if err := v.handleReschedulePersistentCachePeerRequest(ctx, stream, req.GetTaskId(), req.GetPeerId(), reschedulePersistentCachePeerRequest); err != nil {
log.Error(err)
Expand All @@ -1631,12 +1638,18 @@ func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePe
log.Error(err)
return err
}

// If the task is succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePersistentCachePeerRequest_DownloadPersistentCachePeerFailedRequest:
log.Info("receive DownloadPersistentCachePeerFailedRequest")
if err := v.handleDownloadPersistentCachePeerFailedRequest(ctx, req.GetPeerId()); err != nil {
log.Error(err)
return err
}

// If the task is failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePersistentCachePeerRequest_DownloadPieceFinishedRequest:
downloadPieceFinishedRequest := announcePersistentCachePeerRequest.DownloadPieceFinishedRequest

Expand Down Expand Up @@ -2335,7 +2348,7 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule
task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if loaded && !task.FSM.Can(persistentcache.TaskEventUpload) {
log.Errorf("persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
return status.Errorf(codes.FailedPrecondition, "persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
return status.Errorf(codes.AlreadyExists, "persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
}

task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(),
Expand Down
6 changes: 3 additions & 3 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) {
)

assert := assert.New(t)
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
assert.ErrorIs(svc.handleReschedulePeerRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
},
},
{
Expand All @@ -2136,7 +2136,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) {
)

assert := assert.New(t)
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Error(codes.FailedPrecondition, "foo"))
assert.ErrorIs(svc.handleReschedulePeerRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Error(codes.FailedPrecondition, "foo"))
},
},
{
Expand All @@ -2150,7 +2150,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) {
)

assert := assert.New(t)
assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}))
assert.NoError(svc.handleReschedulePeerRequest(context.Background(), peer.ID, []*commonv2.Peer{}))
},
},
}
Expand Down

0 comments on commit 9320a7f

Please sign in to comment.