Skip to content

Commit

Permalink
Merge branch 'feature/piece' of github.com:dragonflyoss/dragonfly int…
Browse files Browse the repository at this point in the history
…o feature/persistent

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jan 21, 2025
2 parents 37d606d + e163f9f commit c57077d
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.49.0
d7y.io/api/v2 v2.1.8
d7y.io/api/v2 v2.1.11
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down Expand Up @@ -93,8 +93,8 @@ require (
golang.org/x/sys v0.28.0
golang.org/x/time v0.9.0
google.golang.org/api v0.216.0
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.2
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.4.7
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CI
cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.8 h1:iM6qA7SyxSaRpqVVZ4Pwj0pkWMd1NX84FRg33eBHMlU=
d7y.io/api/v2 v2.1.8/go.mod h1:/N5t2H+b2XBwfvD7RmZS0ae96861a9ibVXkeF26ir6A=
d7y.io/api/v2 v2.1.11 h1:VZ3SU/2/41xUOtANSvWEUOYjAhC+j/NpbVyJUZw3sa0=
d7y.io/api/v2 v2.1.11/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down Expand Up @@ -2246,8 +2246,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand All @@ -2262,8 +2262,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
1 change: 1 addition & 0 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewTask(id, tag, application, state string, persistentReplicaCount uint64,
PersistentReplicaCount: persistentReplicaCount,
Tag: tag,
Application: application,
PieceLength: pieceLength,
ContentLength: contentLength,
TotalPieceCount: totalPieceCount,
TTL: ttl,
Expand Down
96 changes: 66 additions & 30 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
return err
}
case *schedulerv2.AnnouncePeerRequest_ReschedulePeerRequest:
rescheduleRequest := announcePeerRequest.ReschedulePeerRequest

log.Infof("receive RescheduleRequest description: %s", rescheduleRequest.GetDescription())
if err := v.handleRescheduleRequest(ctx, req.GetPeerId(), rescheduleRequest.GetCandidateParents()); err != nil {
reschedulePeerRequest := announcePeerRequest.ReschedulePeerRequest
log.Infof("receive ReschedulePeerRequestescription: %s", reschedulePeerRequest.GetDescription())
if err := v.handleReschedulePeerRequest(ctx, req.GetPeerId(), reschedulePeerRequest.GetCandidateParents()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_ReschedulePeerFailedRequest:
reschedulePeerFailedRequest := announcePeerRequest.ReschedulePeerFailedRequest
log.Infof("receive ReschedulePeerFailedRequest description: %s", reschedulePeerFailedRequest.GetDescription())

// If the task is reschedule failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
Expand All @@ -141,6 +146,9 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
log.Error(err)
return err
}

// If the task is succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
Expand All @@ -149,6 +157,9 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
log.Error(err)
return err
}

// If the task is back-to-source succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
Expand All @@ -163,6 +174,9 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
log.Error(err)
return err
}

// If the task is back-to-source failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
piece := announcePeerRequest.DownloadPieceFinishedRequest.Piece
log.Infof("receive DownloadPieceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
Expand Down Expand Up @@ -1068,7 +1082,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
}

if err := peer.FSM.Event(ctx, standard.PeerEventRegisterEmpty); err != nil {
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Expand Down Expand Up @@ -1157,8 +1171,8 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
return nil
}

// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
func (v *V2) handleRescheduleRequest(_ context.Context, peerID string, candidateParents []*commonv2.Peer) error {
// handleReschedulePeerRequest handles ReschedulePeerRequest of AnnouncePeerRequest.
func (v *V2) handleReschedulePeerRequest(_ context.Context, peerID string, candidateParents []*commonv2.Peer) error {
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID)
Expand Down Expand Up @@ -1304,7 +1318,7 @@ func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2.
if len(req.Piece.GetDigest()) > 0 {
d, err := digest.Parse(req.Piece.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
return status.Error(codes.InvalidArgument, err.Error())
}

piece.Digest = d
Expand Down Expand Up @@ -1367,7 +1381,7 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(_ context.Context, p
if len(req.Piece.GetDigest()) > 0 {
d, err := digest.Parse(req.Piece.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
return status.Error(codes.InvalidArgument, err.Error())
}

piece.Digest = d
Expand Down Expand Up @@ -1619,18 +1633,26 @@ func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePe
}
case *schedulerv2.AnnouncePersistentCachePeerRequest_ReschedulePersistentCachePeerRequest:
reschedulePersistentCachePeerRequest := announcePersistentCachePeerRequest.ReschedulePersistentCachePeerRequest

log.Info("receive ReschedulePersistentCachePeerRequest")
if err := v.handleReschedulePersistentCachePeerRequest(ctx, stream, req.GetTaskId(), req.GetPeerId(), reschedulePersistentCachePeerRequest); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePersistentCachePeerRequest_ReschedulePersistentCachePeerFailedRequest:
reschedulePersistentCachePeerFailedRequest := announcePersistentCachePeerRequest.ReschedulePersistentCachePeerFailedRequest
log.Infof("receive ReschedulePeerFailedRequest description: %s", reschedulePersistentCachePeerFailedRequest.GetDescription())

// If the task is reschedule failed, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePersistentCachePeerRequest_DownloadPersistentCachePeerFinishedRequest:
log.Info("receive DownloadPersistentCachePeerFinishedRequest")
if err := v.handleDownloadPersistentCachePeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
log.Error(err)
return err
}

// If the task is succeeded, return nil directly and close the stream.
return nil
case *schedulerv2.AnnouncePersistentCachePeerRequest_DownloadPersistentCachePeerFailedRequest:
log.Info("receive DownloadPersistentCachePeerFailedRequest")
if err := v.handleDownloadPersistentCachePeerFailedRequest(ctx, req.GetPeerId()); err != nil {
Expand Down Expand Up @@ -2183,6 +2205,8 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP
}

log := logger.WithPeer(req.HostId, req.TaskId, req.PeerId)
log.Info("stat persistent cache peer")

peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
log.Errorf("persistent cache peer %s not found", req.GetPeerId())
Expand All @@ -2192,13 +2216,13 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP
currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

return &commonv2.PersistentCachePeer{
Expand Down Expand Up @@ -2302,9 +2326,11 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("delete persistent cache peer")

if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy replica to the other peers.
Expand All @@ -2319,6 +2345,8 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("upload persistent cache task started")

host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId())
if !loaded {
log.Error("host not found")
Expand All @@ -2337,12 +2365,12 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule

if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

if err := v.persistentCacheResource.TaskManager().Store(ctx, task); err != nil {
log.Errorf("store persistent cache task %s error %s", task.ID, err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

// Handle peer with task started request, new peer and store it.
Expand All @@ -2355,12 +2383,12 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule

if err := peer.FSM.Event(ctx, persistentcache.PeerEventUpload); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

return nil
Expand All @@ -2373,6 +2401,8 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("upload persistent cache task finished")

// Handle peer with task finished request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
Expand All @@ -2383,38 +2413,38 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul
peer.FinishedPieces.SetAll()
if err := peer.FSM.Event(ctx, persistentcache.PeerEventSucceeded); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
peer.Cost = time.Since(peer.CreatedAt)
peer.UpdatedAt = time.Now()

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

// Handle task with peer finished request, load task and update it.
if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
peer.Task.UpdatedAt = time.Now()

if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil {
log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy multiple replicas to the other peers.
Expand Down Expand Up @@ -2442,6 +2472,8 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("upload persistent cache task failed")

// Handle peer with task failed request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
Expand All @@ -2451,25 +2483,25 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler

if err := peer.FSM.Event(ctx, persistentcache.PeerEventFailed); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}
peer.UpdatedAt = time.Now()

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

// Handle task with peer failed request, load task and update it.
if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}
peer.Task.UpdatedAt = time.Now()

if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil {
log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

return nil
Expand All @@ -2482,6 +2514,8 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
log.Info("stat persistent cache task")

task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !loaded {
log.Errorf("persistent cache task %s not found", req.GetTaskId())
Expand All @@ -2491,13 +2525,13 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP
currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

return &commonv2.PersistentCacheTask{
Expand All @@ -2523,13 +2557,15 @@ func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.Del
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
log.Info("delete persistent cache task")

if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
}

if err := v.persistentCacheResource.TaskManager().Delete(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache task %s error %s", req.GetTaskId(), err)
return status.Errorf(codes.Internal, err.Error())
return status.Error(codes.Internal, err.Error())
}

return nil
Expand Down

0 comments on commit c57077d

Please sign in to comment.