Skip to content

Commit

Permalink
feat: delete persistent cache task from peer (#3786)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 26, 2025
1 parent 7c0f0ab commit a1cb0a4
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.16
d7y.io/api/v2 v2.1.18
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.16 h1:ql4PaC17eG0NSteu+4cijrQ7vA/P/Xki4we66Q7FbQw=
d7y.io/api/v2 v2.1.16/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.18 h1:5fpA94N7CihRdQxWPzUFx3qO7ScY76d0R2oxBtOt9PE=
d7y.io/api/v2 v2.1.18/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
8 changes: 4 additions & 4 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
port = seedPeer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
addr := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
Expand Down Expand Up @@ -416,13 +416,13 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
port = peer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
addr := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(peer.ID, hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
Expand Down
66 changes: 48 additions & 18 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type PeerManager interface {
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)

// LoadAllIDsByTaskID returns all peer ids by task id.
LoadAllIDsByTaskID(context.Context, string) ([]string, error)

// LoadPersistentAllByTaskID returns all persistent peers by task id.
LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error)

Expand All @@ -60,6 +63,9 @@ type PeerManager interface {
// LoadAllByHostID returns all peers by host id.
LoadAllByHostID(context.Context, string) ([]*Peer, error)

// LoadAllIDsByHostID returns all peer ids by host id.
LoadAllIDsByHostID(context.Context, string) ([]string, error)

// DeleteAllByHostID deletes all peers by host id.
DeleteAllByHostID(context.Context, string) error
}
Expand Down Expand Up @@ -301,7 +307,7 @@ func (p *peerManager) LoadAll(ctx context.Context) ([]*Peer, error) {

peerKeys, cursor, err = p.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCachePeersInScheduler(p.config.Manager.SchedulerClusterID), 10).Result()
if err != nil {
logger.Error("scan tasks failed")
logger.Errorf("scan tasks failed: %v", err)
return nil, err
}

Expand All @@ -328,15 +334,15 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -346,20 +352,32 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
return peers, nil
}

// LoadAllIDsByTaskID returns all peer ids by task id.
func (p *peerManager) LoadAllIDsByTaskID(ctx context.Context, taskID string) ([]string, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

return peerIDs, nil
}

// LoadPersistentAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -372,15 +390,15 @@ func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID stri
// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
peers, err := p.LoadAllByTaskID(ctx, taskID)
ids, err := p.LoadAllIDsByTaskID(ctx, taskID)
if err != nil {
log.Error("load peers failed")
log.Errorf("load peers failed: %v", err)
return err
}

for _, peer := range peers {
if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
for _, id := range ids {
if err := p.Delete(ctx, id); err != nil {
log.Errorf("delete peer %s failed: %v", id, err)
continue
}
}
Expand All @@ -393,15 +411,15 @@ func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Pe
log := logger.WithHostID(hostID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -411,18 +429,30 @@ func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Pe
return peers, nil
}

// LoadAllIDsByHostID returns all persistent cache peers by host id.
func (p *peerManager) LoadAllIDsByHostID(ctx context.Context, hostID string) ([]string, error) {
log := logger.WithHostID(hostID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

return peerIDs, nil
}

// DeleteAllByHostID deletes all persistent cache peers by host id.
func (p *peerManager) DeleteAllByHostID(ctx context.Context, hostID string) error {
log := logger.WithTaskID(hostID)
peers, err := p.LoadAllByHostID(ctx, hostID)
ids, err := p.LoadAllIDsByHostID(ctx, hostID)
if err != nil {
log.Error("load peers failed")
log.Errorf("load peers failed: %v", err)
return err
}

for _, peer := range peers {
if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
for _, id := range ids {
if err := p.Delete(ctx, id); err != nil {
log.Errorf("delete peer %s failed: %v", id, err)
continue
}
}
Expand Down
52 changes: 45 additions & 7 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,25 +2322,38 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("delete persistent cache peer")

task, founded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !founded {
log.Errorf("persistent cache task %s not found", req.GetTaskId())
return status.Errorf(codes.NotFound, "persistent cache task %s not found", req.GetTaskId())
peer, found := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !found {
log.Errorf("persistent cache peer %s not found", req.GetPeerId())
return status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}

if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
return status.Error(codes.Internal, err.Error())
}

// Delete the persistent cache task from the peer, if delete failed, skip it.
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.DownloadPort)
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
peer.Log.Errorf("get dfdaemon client failed %s", err)
return err
}

if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
}

// Select the remote peer to copy the replica and trigger the download task with asynchronous.
blocklist := set.NewSafeSet[string]()
blocklist.Add(req.GetHostId())
go func(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) {
if err := v.replicatePersistentCacheTask(ctx, task, blocklist); err != nil {
log.Errorf("replicate persistent cache task failed %s", err)
}
}(context.Background(), task, blocklist)
}(context.Background(), peer.Task, blocklist)

return nil
}
Expand Down Expand Up @@ -2630,10 +2643,35 @@ func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.Del
log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
log.Info("delete persistent cache task")

if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
// Delete the persistent cache peers in the redis and peer.
peers, err := v.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, req.GetTaskId())
if err != nil {
log.Errorf("load persistent cache peers by task %s error %s", req.GetTaskId(), err)
return status.Error(codes.Internal, err.Error())
}

for _, peer := range peers {
if err := v.persistentCacheResource.PeerManager().Delete(ctx, peer.ID); err != nil {
log.Errorf("delete persistent cache peer %s error %s", peer.ID, err)
continue
}

// Delete the persistent cache task from the peer, if delete failed, skip it.
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.DownloadPort)
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
peer.Log.Errorf("get dfdaemon client failed %s", err)
continue
}

if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
continue
}
}

// Delete the persistent cache task in the redis.
if err := v.persistentCacheResource.TaskManager().Delete(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache task %s error %s", req.GetTaskId(), err)
return status.Error(codes.Internal, err.Error())
Expand Down

0 comments on commit a1cb0a4

Please sign in to comment.