Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replicate the persistent cache task when delete host #3787

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.18
d7y.io/api/v2 v2.1.23
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.18 h1:5fpA94N7CihRdQxWPzUFx3qO7ScY76d0R2oxBtOt9PE=
d7y.io/api/v2 v2.1.18/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.23 h1:adUXI1QHNxWG38iwtefZN3j7DysGkHIc3/UmOYFJBgw=
d7y.io/api/v2 v2.1.23/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type V2 interface {
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
DownloadPersistentCacheTask(context.Context, *dfdaemonv2.DownloadPersistentCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error)

// UpdatePersistentCacheTask updates persistent cache task information.
UpdatePersistentCacheTask(context.Context, *dfdaemonv2.UpdatePersistentCacheTaskRequest, ...grpc.CallOption) error

// StatPersistentCacheTask stats persistent cache task information.
StatPersistentCacheTask(context.Context, *dfdaemonv2.StatPersistentCacheTaskRequest, ...grpc.CallOption) (*commonv2.PersistentCacheTask, error)

Expand Down Expand Up @@ -212,6 +215,15 @@ func (v *v2) DownloadPersistentCacheTask(ctx context.Context, req *dfdaemonv2.Do
return v.DfdaemonUploadClient.DownloadPersistentCacheTask(ctx, req, opts...)
}

// UpdatePersistentCacheTask updates persistent cache task information.
func (v *v2) UpdatePersistentCacheTask(ctx context.Context, req *dfdaemonv2.UpdatePersistentCacheTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.UpdatePersistentCacheTask(ctx, req, opts...)
return err
}

// StatPersistentCacheTask stats persistent cache task information.
func (v *v2) StatPersistentCacheTask(ctx context.Context, req *dfdaemonv2.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.PersistentCacheTask, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand Down
19 changes: 19 additions & 0 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file added q
Empty file.
1 change: 1 addition & 0 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Persistent: persistent,
FinishedPieces: finishedPieces,
Task: task,
Host: host,
Expand Down
27 changes: 13 additions & 14 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -226,7 +226,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -238,6 +238,11 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}

return nil
}); err != nil {
peer.Log.Errorf("store peer failed: %v", err)
Expand All @@ -251,8 +256,8 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
peer, found := p.Load(ctx, peerID)
if !found {
return errors.New("getting peer failed from redis")
}

Expand All @@ -261,24 +266,18 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from task joint-set failed: %v", err)
return err
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return err
}

if persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if peer.Persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete persistent peer id from task joint-set failed: %v", err)
}
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from host joint-set failed: %v", err)
return err
}
Expand Down
30 changes: 30 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions scheduler/resource/standard/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions scheduler/scheduling/mocks/scheduling_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading