From e163f9fae58649e875a13fc9744e69170e702612 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 20 Jan 2025 12:12:12 +0800 Subject: [PATCH] feat: add pieceLength for creating persistent cache task Signed-off-by: Gaius --- go.mod | 6 +-- go.sum | 12 ++--- scheduler/resource/persistentcache/task.go | 1 + scheduler/service/service_v2.go | 60 +++++++++++++--------- 4 files changed, 47 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 7852dd9c1bc..20437fcf162 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( cloud.google.com/go/storage v1.49.0 - d7y.io/api/v2 v2.1.8 + d7y.io/api/v2 v2.1.10 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 @@ -93,8 +93,8 @@ require ( golang.org/x/sys v0.28.0 golang.org/x/time v0.9.0 google.golang.org/api v0.216.0 - google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.2 + google.golang.org/grpc v1.69.4 + google.golang.org/protobuf v1.36.3 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.4.7 diff --git a/go.sum b/go.sum index ea2203d7f86..20c4337ab33 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CI cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU= cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI= cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io= -d7y.io/api/v2 v2.1.8 h1:iM6qA7SyxSaRpqVVZ4Pwj0pkWMd1NX84FRg33eBHMlU= -d7y.io/api/v2 v2.1.8/go.mod h1:/N5t2H+b2XBwfvD7RmZS0ae96861a9ibVXkeF26ir6A= +d7y.io/api/v2 v2.1.10 h1:Dgc7a6OXIMPb7K7RGiQStv//Lpm0YUAp19ITE6Gkouo= +d7y.io/api/v2 v2.1.10/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= @@ -2246,8 +2246,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= +google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -2262,8 +2262,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= -google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/scheduler/resource/persistentcache/task.go b/scheduler/resource/persistentcache/task.go index 53cf9ba152b..a5579d2c953 100644 --- a/scheduler/resource/persistentcache/task.go +++ b/scheduler/resource/persistentcache/task.go @@ -110,6 +110,7 @@ func NewTask(id, tag, application, state string, persistentReplicaCount uint64, PersistentReplicaCount: persistentReplicaCount, Tag: tag, Application: application, + PieceLength: pieceLength, ContentLength: contentLength, TotalPieceCount: totalPieceCount, TTL: ttl, diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 5c31cc6e73b..3d39878e8ec 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -1068,7 +1068,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S } if err := peer.FSM.Event(ctx, standard.PeerEventRegisterEmpty); err != nil { - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ @@ -1304,7 +1304,7 @@ func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2. if len(req.Piece.GetDigest()) > 0 { d, err := digest.Parse(req.Piece.GetDigest()) if err != nil { - return status.Errorf(codes.InvalidArgument, err.Error()) + return status.Error(codes.InvalidArgument, err.Error()) } piece.Digest = d @@ -1367,7 +1367,7 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(_ context.Context, p if len(req.Piece.GetDigest()) > 0 { d, err := digest.Parse(req.Piece.GetDigest()) if err != nil { - return status.Errorf(codes.InvalidArgument, err.Error()) + return status.Error(codes.InvalidArgument, err.Error()) } piece.Digest = d @@ -2183,6 +2183,8 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP } log := logger.WithPeer(req.HostId, req.TaskId, req.PeerId) + log.Info("stat persistent cache peer") + peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId()) if !loaded { log.Errorf("persistent cache peer %s not found", req.GetPeerId()) @@ -2192,13 +2194,13 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID) if err != nil { log.Errorf("load current persistent replica count failed %s", err.Error()) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID) if err != nil { log.Errorf("load current replica count failed %s", err.Error()) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &commonv2.PersistentCachePeer{ @@ -2302,9 +2304,11 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del } log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) + log.Info("delete persistent cache peer") + if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil { log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } // TODO(gaius) Implement copy replica to the other peers. @@ -2319,6 +2323,8 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule } log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) + log.Info("upload persistent cache task started") + host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId()) if !loaded { log.Error("host not found") @@ -2337,12 +2343,12 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil { log.Errorf("task fsm event failed: %s", err.Error()) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := v.persistentCacheResource.TaskManager().Store(ctx, task); err != nil { log.Errorf("store persistent cache task %s error %s", task.ID, err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } // Handle peer with task started request, new peer and store it. @@ -2355,12 +2361,12 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule if err := peer.FSM.Event(ctx, persistentcache.PeerEventUpload); err != nil { log.Errorf("peer fsm event failed: %s", err.Error()) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil { log.Errorf("store persistent cache peer %s error %s", peer.ID, err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } return nil @@ -2373,6 +2379,8 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul } log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) + log.Info("upload persistent cache task finished") + // Handle peer with task finished request, load peer and update it. peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId()) if !loaded { @@ -2383,38 +2391,38 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul peer.FinishedPieces.SetAll() if err := peer.FSM.Event(ctx, persistentcache.PeerEventSucceeded); err != nil { log.Errorf("peer fsm event failed: %s", err.Error()) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } peer.Cost = time.Since(peer.CreatedAt) peer.UpdatedAt = time.Now() if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil { log.Errorf("store persistent cache peer %s error %s", peer.ID, err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Handle task with peer finished request, load task and update it. if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil { log.Errorf("task fsm event failed: %s", err.Error()) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } peer.Task.UpdatedAt = time.Now() if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil { log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID) if err != nil { log.Errorf("load current persistent replica count failed %s", err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID) if err != nil { log.Errorf("load current replica count failed %s", err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // TODO(gaius) Implement copy multiple replicas to the other peers. @@ -2442,6 +2450,8 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler } log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) + log.Info("upload persistent cache task failed") + // Handle peer with task failed request, load peer and update it. peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId()) if !loaded { @@ -2451,25 +2461,25 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler if err := peer.FSM.Event(ctx, persistentcache.PeerEventFailed); err != nil { log.Errorf("peer fsm event failed: %s", err.Error()) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } peer.UpdatedAt = time.Now() if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil { log.Errorf("store persistent cache peer %s error %s", peer.ID, err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } // Handle task with peer failed request, load task and update it. if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil { log.Errorf("task fsm event failed: %s", err.Error()) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } peer.Task.UpdatedAt = time.Now() if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil { log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } return nil @@ -2482,6 +2492,8 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP } log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId()) + log.Info("stat persistent cache task") + task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId()) if !loaded { log.Errorf("persistent cache task %s not found", req.GetTaskId()) @@ -2491,13 +2503,13 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID) if err != nil { log.Errorf("load current persistent replica count failed %s", err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, task.ID) if err != nil { log.Errorf("load current replica count failed %s", err) - return nil, status.Errorf(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &commonv2.PersistentCacheTask{ @@ -2523,13 +2535,15 @@ func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.Del } log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId()) + log.Info("delete persistent cache task") + if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil { log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err) } if err := v.persistentCacheResource.TaskManager().Delete(ctx, req.GetTaskId()); err != nil { log.Errorf("delete persistent cache task %s error %s", req.GetTaskId(), err) - return status.Errorf(codes.Internal, err.Error()) + return status.Error(codes.Internal, err.Error()) } return nil