From 0b1157bba85e4096df80b3541f5bce98e2872e62 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 24 Jan 2024 22:58:06 +0800 Subject: [PATCH] feat: replace filters with filteredQueryParams (#3049) Signed-off-by: Gaius --- go.mod | 4 +- go.sum | 4 +- internal/job/types.go | 14 +- manager/job/preheat.go | 16 +-- manager/service/preheat.go | 8 +- manager/types/job.go | 4 +- manager/types/preheat.go | 8 +- pkg/idgen/task_id.go | 20 +-- pkg/net/url/url.go | 8 +- pkg/net/url/url_test.go | 6 +- scheduler/job/job.go | 4 +- scheduler/resource/host_manager_test.go | 2 +- scheduler/resource/host_test.go | 10 +- scheduler/resource/peer_manager_test.go | 10 +- scheduler/resource/peer_test.go | 32 ++--- scheduler/resource/seed_peer.go | 2 +- scheduler/resource/seed_peer_test.go | 2 +- scheduler/resource/task.go | 42 +++--- scheduler/resource/task_manager_test.go | 10 +- scheduler/resource/task_test.go | 74 +++++------ .../evaluator/evaluator_base_test.go | 68 +++++----- scheduler/scheduling/scheduling.go | 30 ++--- scheduler/scheduling/scheduling_test.go | 58 ++++---- scheduler/service/service_v1.go | 8 +- scheduler/service/service_v1_test.go | 68 +++++----- scheduler/service/service_v2.go | 64 ++++----- scheduler/service/service_v2_test.go | 124 +++++++++--------- 27 files changed, 350 insertions(+), 350 deletions(-) diff --git a/go.mod b/go.mod index 66ddc1a5f3b..c8e7ff35081 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.84 + d7y.io/api/v2 v2.0.85 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 @@ -24,6 +24,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/gaius-qi/ping v1.0.0 github.com/gammazero/deque v0.2.1 + github.com/gin-contrib/gzip v0.0.6 github.com/gin-contrib/static v0.0.1 github.com/gin-contrib/zap v0.2.0 github.com/gin-gonic/gin v1.9.1 @@ -127,7 +128,6 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/gin-contrib/gzip v0.0.6 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/go.sum b/go.sum index ee5db20099b..4fa2ec9eafc 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.84 h1:V4lpcsMccllHH4/yUgWYcEdrl8XgZoijX92Tp2as0e0= -d7y.io/api/v2 v2.0.84/go.mod h1:nBK3pWGNkbZTI49Rt9oV4KeGtELO2m2puDgltVpIRg4= +d7y.io/api/v2 v2.0.85 h1:20phOlkxI/f0kgg+RC60N3F2AXmAzlDaZTs7aUrxgyg= +d7y.io/api/v2 v2.0.85/go.mod h1:nBK3pWGNkbZTI49Rt9oV4KeGtELO2m2puDgltVpIRg4= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= diff --git a/internal/job/types.go b/internal/job/types.go index 30ac2d667fe..2c845002139 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -17,13 +17,13 @@ package job type PreheatRequest struct { - URL string `json:"url" validate:"required,url"` - Tag string `json:"tag" validate:"omitempty"` - Digest string `json:"digest" validate:"omitempty"` - Filter string `json:"filter" validate:"omitempty"` - Headers map[string]string `json:"headers" validate:"omitempty"` - Application string `json:"application" validate:"omitempty"` - Priority int32 `json:"priority" validate:"omitempty"` + URL string `json:"url" validate:"required,url"` + Tag string `json:"tag" validate:"omitempty"` + Digest string `json:"digest" validate:"omitempty"` + FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"` + Headers map[string]string `json:"headers" validate:"omitempty"` + Application string `json:"application" validate:"omitempty"` + Priority int32 `json:"priority" validate:"omitempty"` } type PreheatResponse struct { diff --git a/manager/job/preheat.go b/manager/job/preheat.go index b283b2b2462..e5cf77dde44 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -106,10 +106,10 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul case PreheatFileType: files = []internaljob.PreheatRequest{ { - URL: json.URL, - Tag: json.Tag, - Filter: json.Filter, - Headers: json.Headers, + URL: json.URL, + Tag: json.Tag, + FilteredQueryParams: json.FilteredQueryParams, + Headers: json.Headers, }, } default: @@ -301,10 +301,10 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh for _, v := range m.References() { header.Set("Accept", v.MediaType) layer := internaljob.PreheatRequest{ - URL: image.blobsURL(v.Digest.String()), - Tag: args.Tag, - Filter: args.Filter, - Headers: nethttp.HeaderToMap(header), + URL: image.blobsURL(v.Digest.String()), + Tag: args.Tag, + FilteredQueryParams: args.FilteredQueryParams, + Headers: nethttp.HeaderToMap(header), } layers = append(layers, layer) diff --git a/manager/service/preheat.go b/manager/service/preheat.go index ef81a2df100..83f43ced3ae 100644 --- a/manager/service/preheat.go +++ b/manager/service/preheat.go @@ -48,10 +48,10 @@ func (s *service) CreateV1Preheat(ctx context.Context, json types.CreateV1Prehea job, err := s.CreatePreheatJob(ctx, types.CreatePreheatJobRequest{ Type: internaljob.PreheatJob, Args: types.PreheatArgs{ - Type: json.Type, - URL: json.URL, - Filter: json.Filter, - Headers: json.Headers, + Type: json.Type, + URL: json.URL, + FilteredQueryParams: json.FilteredQueryParams, + Headers: json.Headers, }, }) if err != nil { diff --git a/manager/types/job.go b/manager/types/job.go index 02b2697d3a9..056476e7a7f 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -62,8 +62,8 @@ type PreheatArgs struct { // Tag is the tag for preheating. Tag string `json:"tag" binding:"omitempty"` - // Filter is filter for preheating. - Filter string `json:"filter" binding:"omitempty"` + // FilteredQueryParams is the filtered query params for preheating. + FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` // Headers is the http headers for authentication. Headers map[string]string `json:"headers" binding:"omitempty"` diff --git a/manager/types/preheat.go b/manager/types/preheat.go index 930de0c8adb..3b524ce0233 100644 --- a/manager/types/preheat.go +++ b/manager/types/preheat.go @@ -17,10 +17,10 @@ package types type CreateV1PreheatRequest struct { - Type string `json:"type" binding:"required,oneof=image file"` - URL string `json:"url" binding:"required"` - Filter string `json:"filter" binding:"omitempty"` - Headers map[string]string `json:"headers" binding:"omitempty"` + Type string `json:"type" binding:"required,oneof=image file"` + URL string `json:"url" binding:"required"` + FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` + Headers map[string]string `json:"headers" binding:"omitempty"` } type CreateV1PreheatResponse struct { diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 7a288327036..2c03e371f83 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -28,8 +28,8 @@ import ( ) const ( - // URLFilterSeparator is filter separator for url. - URLFilterSeparator = "&" + // FilteredQueryParamsSeparator is the separator of filtered query params. + FilteredQueryParamsSeparator = "&" ) // TaskIDV1 generates v1 version of task id. @@ -51,13 +51,13 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string { return pkgdigest.SHA256FromStrings(url) } - filters := parseFilters(meta.Filter) + filteredQueryParams := parseFilteredQueryParams(meta.Filter) var ( u string err error ) - u, err = neturl.FilterQuery(url, filters) + u, err = neturl.FilterQueryParams(url, filteredQueryParams) if err != nil { u = "" } @@ -82,18 +82,18 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string { return pkgdigest.SHA256FromStrings(data...) } -// parseFilters parses a filter string to filter slice. -func parseFilters(rawFilters string) []string { - if pkgstrings.IsBlank(rawFilters) { +// parseFilteredQueryParams parses filtered query params. +func parseFilteredQueryParams(rawFilteredQueryParams string) []string { + if pkgstrings.IsBlank(rawFilteredQueryParams) { return nil } - return strings.Split(rawFilters, URLFilterSeparator) + return strings.Split(rawFilteredQueryParams, FilteredQueryParamsSeparator) } // TaskIDV2 generates v2 version of task id. -func TaskIDV2(url, digest, tag, application string, pieceLength int32, filters []string) string { - url, err := neturl.FilterQuery(url, filters) +func TaskIDV2(url, digest, tag, application string, pieceLength int32, filteredQueryParams []string) string { + url, err := neturl.FilterQueryParams(url, filteredQueryParams) if err != nil { url = "" } diff --git a/pkg/net/url/url.go b/pkg/net/url/url.go index 7b11b2f0523..88a59cc8bee 100644 --- a/pkg/net/url/url.go +++ b/pkg/net/url/url.go @@ -20,9 +20,9 @@ import ( "net/url" ) -// FilterQuery excludes query string in url with filters. -func FilterQuery(rawURL string, filters []string) (string, error) { - if len(filters) == 0 { +// FilterQueryParams filters the query params in the url. +func FilterQueryParams(rawURL string, filteredQueryParams []string) (string, error) { + if len(filteredQueryParams) == 0 { return rawURL, nil } @@ -32,7 +32,7 @@ func FilterQuery(rawURL string, filters []string) (string, error) { } hidden := make(map[string]struct{}) - for _, filter := range filters { + for _, filter := range filteredQueryParams { hidden[filter] = struct{}{} } diff --git a/pkg/net/url/url_test.go b/pkg/net/url/url_test.go index ad44a03e8ac..73c2eee923c 100644 --- a/pkg/net/url/url_test.go +++ b/pkg/net/url/url_test.go @@ -23,15 +23,15 @@ import ( ) func TestFilterQuery(t *testing.T) { - url, err := FilterQuery("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{"x", "m"}) + url, err := FilterQueryParams("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{"x", "m"}) assert.Nil(t, err) assert.Equal(t, "http://www.xx.yy/path?u=f#size", url) - url, err = FilterQuery("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{}) + url, err = FilterQueryParams("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{}) assert.Nil(t, err) assert.Equal(t, "http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", url) - url, err = FilterQuery(":error_url", []string{"x", "m"}) + url, err = FilterQueryParams(":error_url", []string{"x", "m"}) assert.NotNil(t, err) assert.Equal(t, "", url) } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 7f2071f5ae8..3331048a062 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -177,7 +177,7 @@ func (j *job) preheat(ctx context.Context, req string) error { urlMeta := &commonv1.UrlMeta{ Digest: preheat.Digest, Tag: preheat.Tag, - Filter: preheat.Filter, + Filter: preheat.FilteredQueryParams, Header: preheat.Headers, Application: preheat.Application, Priority: commonv1.Priority(preheat.Priority), @@ -192,7 +192,7 @@ func (j *job) preheat(ctx context.Context, req string) error { // Trigger seed peer download seeds. taskID := idgen.TaskIDV1(preheat.URL, urlMeta) log := logger.WithTask(taskID, preheat.URL) - log.Infof("preheat %s tag: %s, range: %s, filter: %s, digest: %s", + log.Infof("preheat %s tag: %s, range: %s, filtered query params: %s, digest: %s", preheat.URL, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) log.Debugf("preheat %s headers: %#v", preheat.URL, urlMeta.Header) diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index 7a147ac83db..9766572055d 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -515,7 +515,7 @@ func TestHostManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index d95c1c8d67d..0f2923d4d9f 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -574,7 +574,7 @@ func TestHost_LoadPeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host) host.StorePeer(mockPeer) @@ -619,7 +619,7 @@ func TestHost_StorePeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(tc.peerID, mockResourceConfig, mockTask, host) host.StorePeer(mockPeer) @@ -665,7 +665,7 @@ func TestHost_DeletePeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host) host.StorePeer(mockPeer) @@ -717,7 +717,7 @@ func TestHost_LeavePeers(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host) tc.expect(t, host, mockPeer) @@ -769,7 +769,7 @@ func TestHost_FreeUploadCount(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host) tc.expect(t, host, mockTask, mockPeer) diff --git a/scheduler/resource/peer_manager_test.go b/scheduler/resource/peer_manager_test.go index 209691b1784..7437491b263 100644 --- a/scheduler/resource/peer_manager_test.go +++ b/scheduler/resource/peer_manager_test.go @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -549,7 +549,7 @@ func TestPeerManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peerManager, err := newPeerManager(tc.gcConfig, gc) if err != nil { diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index c46a961151e..05241b442b7 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -169,7 +169,7 @@ func TestPeer_NewPeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) tc.expect(t, NewPeer(tc.id, mockResourceConfig, mockTask, mockHost, tc.options...), mockTask, mockHost) }) } @@ -204,7 +204,7 @@ func TestPeer_AppendPieceCost(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer) @@ -241,7 +241,7 @@ func TestPeer_PieceCosts(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer) @@ -283,7 +283,7 @@ func TestPeer_LoadReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -316,7 +316,7 @@ func TestPeer_StoreReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -349,7 +349,7 @@ func TestPeer_DeleteReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -390,7 +390,7 @@ func TestPeer_LoadAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -423,7 +423,7 @@ func TestPeer_StoreAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -456,7 +456,7 @@ func TestPeer_DeleteAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -512,7 +512,7 @@ func TestPeer_LoadPiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -553,7 +553,7 @@ func TestPeer_StorePiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -598,7 +598,7 @@ func TestPeer_DeletePiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -646,7 +646,7 @@ func TestPeer_Parents(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, seedPeer, stream) @@ -692,7 +692,7 @@ func TestPeer_Children(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) tc.expect(t, peer, seedPeer, stream) @@ -774,7 +774,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) if tc.mockServer == nil { @@ -926,7 +926,7 @@ func TestPeer_CalculatePriority(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.mock(peer, dynconfig.EXPECT()) tc.expect(t, peer.CalculatePriority(dynconfig)) diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index d33a25eb804..ede0e1cef9d 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -101,7 +101,7 @@ func (s *seedPeer) TriggerDownloadTask(ctx context.Context, taskID string, req * func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) (*Peer, *schedulerv1.PeerResult, error) { urlMeta := &commonv1.UrlMeta{ Tag: task.Tag, - Filter: strings.Join(task.Filters, idgen.URLFilterSeparator), + Filter: strings.Join(task.FilteredQueryParams, idgen.FilteredQueryParamsSeparator), Header: task.Header, Application: task.Application, Priority: commonv1.Priority_LEVEL0, diff --git a/scheduler/resource/seed_peer_test.go b/scheduler/resource/seed_peer_test.go index 2980c2a1c2e..9bb496477a6 100644 --- a/scheduler/resource/seed_peer_test.go +++ b/scheduler/resource/seed_peer_test.go @@ -128,7 +128,7 @@ func TestSeedPeer_TriggerTask(t *testing.T) { tc.mock(client.EXPECT()) seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer, result, err := seedPeer.TriggerTask(context.Background(), nil, mockTask) tc.expect(t, peer, result, err) }) diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index dad5799bb32..0ceb9adfe34 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -121,8 +121,8 @@ type Task struct { // Application identifies different task for same url. Application string - // Filter url used to generate task id. - Filters []string + // FilteredQueryParams is filtered query params. + FilteredQueryParams []string // Task request headers. Header map[string]string @@ -169,27 +169,27 @@ type Task struct { } // New task instance. -func NewTask(id, url, tag, application string, typ commonv2.TaskType, filters []string, +func NewTask(id, url, tag, application string, typ commonv2.TaskType, filteredQueryParams []string, header map[string]string, backToSourceLimit int32, options ...TaskOption) *Task { t := &Task{ - ID: id, - Type: typ, - URL: url, - Tag: tag, - Application: application, - Filters: filters, - Header: header, - DirectPiece: []byte{}, - ContentLength: atomic.NewInt64(-1), - TotalPieceCount: atomic.NewInt32(0), - BackToSourceLimit: atomic.NewInt32(backToSourceLimit), - BackToSourcePeers: set.NewSafeSet[string](), - Pieces: &sync.Map{}, - DAG: dag.NewDAG[*Peer](), - PeerFailedCount: atomic.NewInt32(0), - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - Log: logger.WithTask(id, url), + ID: id, + Type: typ, + URL: url, + Tag: tag, + Application: application, + FilteredQueryParams: filteredQueryParams, + Header: header, + DirectPiece: []byte{}, + ContentLength: atomic.NewInt64(-1), + TotalPieceCount: atomic.NewInt32(0), + BackToSourceLimit: atomic.NewInt32(backToSourceLimit), + BackToSourcePeers: set.NewSafeSet[string](), + Pieces: &sync.Map{}, + DAG: dag.NewDAG[*Peer](), + PeerFailedCount: atomic.NewInt32(0), + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), + Log: logger.WithTask(id, url), } // Initialize state machine. diff --git a/scheduler/resource/task_manager_test.go b/scheduler/resource/task_manager_test.go index 6c41e6d9baf..00a769ad1da 100644 --- a/scheduler/resource/task_manager_test.go +++ b/scheduler/resource/task_manager_test.go @@ -131,7 +131,7 @@ func TestTaskManager_Load(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -184,7 +184,7 @@ func TestTaskManager_Store(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -235,7 +235,7 @@ func TestTaskManager_LoadOrStore(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -288,7 +288,7 @@ func TestTaskManager_Delete(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -349,7 +349,7 @@ func TestTaskManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index 064905c98d3..a597221cdd4 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -50,16 +50,16 @@ var ( CreatedAt: time.Now(), } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilters = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockConfig = &config.Config{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockTaskPieceLength int32 = 2048 + mockConfig = &config.Config{ Resource: *mockResourceConfig, } @@ -94,7 +94,7 @@ func TestTask_NewTask(t *testing.T) { assert.Nil(task.Digest) assert.Equal(task.Tag, mockTaskTag) assert.Equal(task.Application, mockTaskApplication) - assert.EqualValues(task.Filters, mockTaskFilters) + assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) @@ -121,7 +121,7 @@ func TestTask_NewTask(t *testing.T) { assert.Nil(task.Digest) assert.Equal(task.Tag, mockTaskTag) assert.Equal(task.Application, mockTaskApplication) - assert.EqualValues(task.Filters, mockTaskFilters) + assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) assert.Equal(task.PieceLength, mockTaskPieceLength) assert.Empty(task.DirectPiece) @@ -148,7 +148,7 @@ func TestTask_NewTask(t *testing.T) { assert.EqualValues(task.Digest, mockTaskDigest) assert.Equal(task.Tag, mockTaskTag) assert.Equal(task.Application, mockTaskApplication) - assert.EqualValues(task.Filters, mockTaskFilters) + assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) @@ -168,7 +168,7 @@ func TestTask_NewTask(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, tc.options...)) + tc.expect(t, NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, tc.options...)) }) } } @@ -211,7 +211,7 @@ func TestTask_LoadPeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) task.StorePeer(mockPeer) @@ -280,7 +280,7 @@ func TestTask_LoadRandomPeers(t *testing.T) { host := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, task, host) }) @@ -318,7 +318,7 @@ func TestTask_StorePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(tc.peerID, mockResourceConfig, task, mockHost) task.StorePeer(mockPeer) @@ -360,7 +360,7 @@ func TestTask_DeletePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) task.StorePeer(mockPeer) @@ -399,7 +399,7 @@ func TestTask_PeerCount(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) tc.expect(t, mockPeer, task) @@ -497,7 +497,7 @@ func TestTask_AddPeerEdge(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -601,7 +601,7 @@ func TestTask_DeletePeerInEdges(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -703,7 +703,7 @@ func TestTask_DeletePeerOutEdges(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -790,7 +790,7 @@ func TestTask_CanAddPeerEdge(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -853,7 +853,7 @@ func TestTask_PeerDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -916,7 +916,7 @@ func TestTask_PeerInDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -979,7 +979,7 @@ func TestTask_PeerOutDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -1050,7 +1050,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) tc.expect(t, task, mockPeer) @@ -1117,7 +1117,7 @@ func TestTask_LoadSeedPeer(t *testing.T) { mockSeedHost := NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, task, mockSeedHost) @@ -1184,7 +1184,7 @@ func TestTask_IsSeedPeerFailed(t *testing.T) { mockSeedHost := NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, task, mockSeedHost) @@ -1239,7 +1239,7 @@ func TestTask_LoadPiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) piece, loaded := task.LoadPiece(tc.pieceNumber) @@ -1276,7 +1276,7 @@ func TestTask_StorePiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) piece, loaded := task.LoadPiece(tc.pieceNumber) @@ -1317,7 +1317,7 @@ func TestTask_DeletePiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) task.DeletePiece(tc.pieceNumber) @@ -1397,7 +1397,7 @@ func TestTask_SizeScope(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.ContentLength.Store(tc.contentLength) task.TotalPieceCount.Store(tc.totalPieceCount) tc.expect(t, task) @@ -1449,7 +1449,7 @@ func TestTask_CanBackToSource(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, tc.backToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, tc.backToSourceLimit) tc.run(t, task) }) } @@ -1490,7 +1490,7 @@ func TestTask_CanReuseDirectPiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, task) }) } @@ -1571,7 +1571,7 @@ func TestTask_ReportPieceResultToPeers(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) task.StorePeer(mockPeer) tc.run(t, task, mockPeer, stream, stream.EXPECT()) @@ -1654,7 +1654,7 @@ func TestTask_AnnouncePeers(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) task.StorePeer(mockPeer) tc.run(t, task, mockPeer, stream, stream.EXPECT()) diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 3a9cbe9dfa8..d440a161e4b 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -129,16 +129,16 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilters = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockResourceConfig = &config.ResourceConfig{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockTaskPieceLength int32 = 2048 + mockResourceConfig = &config.ResourceConfig{ Task: config.TaskConfig{ DownloadTiny: config.DownloadTinyConfig{ Scheme: config.DefaultResourceTaskDownloadTinyScheme, @@ -190,7 +190,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "parents is empty", parents: []*resource.Peer{}, child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -207,13 +207,13 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate single parent", parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -232,33 +232,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with free upload count", parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -283,33 +283,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with pieces", parents: []*resource.Peer{ resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -353,12 +353,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent", parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -373,12 +373,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent with pieces", parent: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, - resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), + resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)), resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -405,7 +405,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string @@ -566,7 +566,7 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host) tc.mock(host) tc.expect(t, calculateParentHostUploadSuccessScore(mockPeer)) @@ -615,7 +615,7 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host) tc.mock(host, mockPeer) tc.expect(t, calculateFreeUploadScore(host)) @@ -666,7 +666,7 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.mock(peer) tc.expect(t, calculateHostTypeScore(peer)) @@ -877,7 +877,7 @@ func TestEvaluatorBase_IsBadNode(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index d25437ae14d..d7e0b216f88 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -621,21 +621,21 @@ func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *sche // Set task to parent. parent.Task = &commonv2.Task{ - Id: candidateParent.Task.ID, - Type: candidateParent.Task.Type, - Url: candidateParent.Task.URL, - Tag: &candidateParent.Task.Tag, - Application: &candidateParent.Task.Application, - Filters: candidateParent.Task.Filters, - RequestHeader: candidateParent.Task.Header, - PieceLength: uint32(candidateParent.Task.PieceLength), - ContentLength: uint64(candidateParent.Task.ContentLength.Load()), - PieceCount: uint32(candidateParent.Task.TotalPieceCount.Load()), - SizeScope: candidateParent.Task.SizeScope(), - State: candidateParent.Task.FSM.Current(), - PeerCount: uint32(candidateParent.Task.PeerCount()), - CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), + Id: candidateParent.Task.ID, + Type: candidateParent.Task.Type, + Url: candidateParent.Task.URL, + Tag: &candidateParent.Task.Tag, + Application: &candidateParent.Task.Application, + FilteredQueryParams: candidateParent.Task.FilteredQueryParams, + RequestHeader: candidateParent.Task.Header, + PieceLength: uint32(candidateParent.Task.PieceLength), + ContentLength: uint64(candidateParent.Task.ContentLength.Load()), + PieceCount: uint32(candidateParent.Task.TotalPieceCount.Load()), + SizeScope: candidateParent.Task.SizeScope(), + State: candidateParent.Task.FSM.Current(), + PeerCount: uint32(candidateParent.Task.PeerCount()), + CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()), } // Set digest to parent task. diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 589c25f9a1d..f1e7fee191d 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -157,16 +157,16 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilters = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockResourceConfig = &config.ResourceConfig{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockTaskPieceLength int32 = 2048 + mockResourceConfig = &config.ResourceConfig{ Task: config.TaskConfig{ DownloadTiny: config.DownloadTinyConfig{ Scheme: config.DefaultResourceTaskDownloadTinyScheme, @@ -443,7 +443,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -713,7 +713,7 @@ func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1025,7 +1025,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) var mockPeers []*resource.Peer @@ -1342,7 +1342,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) var mockPeers []*resource.Peer @@ -1603,7 +1603,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) var mockPeers []*resource.Peer @@ -1660,18 +1660,18 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Cost: durationpb.New(candidateParents[0].Cost.Load()), State: candidateParents[0].FSM.Current(), Task: &commonv2.Task{ - Id: candidateParents[0].Task.ID, - Type: candidateParents[0].Task.Type, - Url: candidateParents[0].Task.URL, - Digest: &dgst, - Tag: &candidateParents[0].Task.Tag, - Application: &candidateParents[0].Task.Application, - Filters: candidateParents[0].Task.Filters, - RequestHeader: candidateParents[0].Task.Header, - PieceLength: uint32(candidateParents[0].Task.PieceLength), - ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), - PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), - SizeScope: candidateParents[0].Task.SizeScope(), + Id: candidateParents[0].Task.ID, + Type: candidateParents[0].Task.Type, + Url: candidateParents[0].Task.URL, + Digest: &dgst, + Tag: &candidateParents[0].Task.Tag, + Application: &candidateParents[0].Task.Application, + FilteredQueryParams: candidateParents[0].Task.FilteredQueryParams, + RequestHeader: candidateParents[0].Task.Header, + PieceLength: uint32(candidateParents[0].Task.PieceLength), + ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), + PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), + SizeScope: candidateParents[0].Task.SizeScope(), Pieces: []*commonv2.Piece{ { Number: uint32(mockPiece.Number), @@ -1768,7 +1768,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost, resource.WithRange(nethttp.Range{ Start: 1, Length: 10, @@ -1818,7 +1818,7 @@ func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) parent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 3edfc7cfe62..6fd7f94ae28 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -359,7 +359,7 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ } task := resource.NewTask(taskID, req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), types.TaskTypeV1ToV2(req.GetTaskType()), - strings.Split(req.UrlMeta.GetFilter(), idgen.URLFilterSeparator), req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) + strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator), req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) task, _ = v.resource.TaskManager().LoadOrStore(task) host := v.storeHost(ctx, req.GetPeerHost()) peer := v.storePeer(ctx, peerID, req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host) @@ -917,7 +917,7 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso // storeTask stores a new task or reuses a previous task. func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task { - filters := strings.Split(req.UrlMeta.GetFilter(), idgen.URLFilterSeparator) + filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator) task, loaded := v.resource.TaskManager().Load(req.GetTaskId()) if !loaded { @@ -927,7 +927,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty } task := resource.NewTask(req.GetTaskId(), req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), - typ, filters, req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) + typ, filteredQueryParams, req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) task.Log.Info("create new task") return task @@ -936,7 +936,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty // Task is the pointer, if the task already exists, the next request will // update the task's Url and UrlMeta in task manager. task.URL = req.GetUrl() - task.Filters = filters + task.FilteredQueryParams = filteredQueryParams task.Header = req.UrlMeta.GetHeader() task.Log.Info("task already exists") return task diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 6db1064ae67..c10413e18bd 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -194,16 +194,16 @@ var ( Idc: mockHostIDC, } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilters = []string{"bar"} - mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} - mockTaskPieceLength int32 = 2048 - mockResourceConfig = &config.ResourceConfig{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} + mockTaskPieceLength int32 = 2048 + mockResourceConfig = &config.ResourceConfig{ Task: config.TaskConfig{ DownloadTiny: config.DownloadTinyConfig{ Scheme: config.DefaultResourceTaskDownloadTinyScheme, @@ -1013,7 +1013,7 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1278,7 +1278,7 @@ func TestServiceV1_ReportPieceResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT()) tc.expect(t, mockPeer, svc.ReportPieceResult(stream)) @@ -1461,7 +1461,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT(), dynconfig.EXPECT()) }) @@ -1522,7 +1522,7 @@ func TestServiceV1_StatTask(t *testing.T) { networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) taskManager := resource.NewMockTaskManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) task, err := svc.StatTask(context.Background(), &schedulerv1.StatTaskRequest{TaskId: mockTaskID}) @@ -1822,7 +1822,7 @@ func TestServiceV1_AnnounceTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) tc.mock(mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) @@ -2020,7 +2020,7 @@ func TestServiceV1_LeaveTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -2675,7 +2675,7 @@ func TestServiceV1_LeaveHost(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -3091,7 +3091,7 @@ func TestServiceV1_prefetchTask(t *testing.T) { Digest: mockTaskDigest.String(), Tag: mockTaskTag, Range: mockURLMetaRange, - Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator), + Filter: strings.Join(mockTaskFilteredQueryParams, idgen.FilteredQueryParamsSeparator), Header: mockTaskHeader, Application: mockTaskApplication, Priority: commonv1.Priority_LEVEL0, @@ -3129,7 +3129,7 @@ func TestServiceV1_prefetchTask(t *testing.T) { Digest: mockTaskDigest.String(), Tag: mockTaskTag, Range: mockURLMetaRange, - Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator), + Filter: strings.Join(mockTaskFilteredQueryParams, idgen.FilteredQueryParamsSeparator), Header: mockTaskHeader, Application: mockTaskApplication, Priority: commonv1.Priority_LEVEL0, @@ -3164,7 +3164,7 @@ func TestServiceV1_prefetchTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, task, mockHost) svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology) taskManager := resource.NewMockTaskManager(ctl) @@ -3635,7 +3635,7 @@ func TestServiceV1_triggerTask(t *testing.T) { mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockSeedHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -3664,7 +3664,7 @@ func TestServiceV1_storeTask(t *testing.T) { Url: mockTaskURL, UrlMeta: &commonv1.UrlMeta{ Priority: commonv1.Priority_LEVEL0, - Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator), + Filter: strings.Join(mockTaskFilteredQueryParams, idgen.FilteredQueryParamsSeparator), Header: mockTaskHeader, }, PeerHost: mockPeerHost, @@ -3692,7 +3692,7 @@ func TestServiceV1_storeTask(t *testing.T) { Digest: mockTaskDigest.String(), Tag: mockTaskTag, Application: mockTaskApplication, - Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator), + Filter: strings.Join(mockTaskFilteredQueryParams, idgen.FilteredQueryParamsSeparator), Header: mockTaskHeader, }, PeerHost: mockPeerHost, @@ -3705,7 +3705,7 @@ func TestServiceV1_storeTask(t *testing.T) { assert.EqualValues(task.Digest, mockTaskDigest) assert.Equal(task.Tag, mockTaskTag) assert.Equal(task.Application, mockTaskApplication) - assert.EqualValues(task.Filters, mockTaskFilters) + assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) @@ -3835,7 +3835,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) gomock.InOrder( @@ -3855,7 +3855,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1), @@ -3960,7 +3960,7 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, task, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -4042,7 +4042,7 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -4057,7 +4057,7 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string @@ -4373,7 +4373,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -4499,7 +4499,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -4581,7 +4581,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) child := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) @@ -4665,7 +4665,7 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) svc.handleTaskSuccess(context.Background(), task, tc.result) @@ -4805,7 +4805,7 @@ func TestServiceV1_handleTaskFail(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) svc.handleTaskFailure(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 5c14e38c61d..be673e36e40 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -255,21 +255,21 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c // Set task to response. resp.Task = &commonv2.Task{ - Id: peer.Task.ID, - Type: peer.Task.Type, - Url: peer.Task.URL, - Tag: &peer.Task.Tag, - Application: &peer.Task.Application, - Filters: peer.Task.Filters, - RequestHeader: peer.Task.Header, - PieceLength: uint32(peer.Task.PieceLength), - ContentLength: uint64(peer.Task.ContentLength.Load()), - PieceCount: uint32(peer.Task.TotalPieceCount.Load()), - SizeScope: peer.Task.SizeScope(), - State: peer.Task.FSM.Current(), - PeerCount: uint32(peer.Task.PeerCount()), - CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), + Id: peer.Task.ID, + Type: peer.Task.Type, + Url: peer.Task.URL, + Tag: &peer.Task.Tag, + Application: &peer.Task.Application, + FilteredQueryParams: peer.Task.FilteredQueryParams, + RequestHeader: peer.Task.Header, + PieceLength: uint32(peer.Task.PieceLength), + ContentLength: uint64(peer.Task.ContentLength.Load()), + PieceCount: uint32(peer.Task.TotalPieceCount.Load()), + SizeScope: peer.Task.SizeScope(), + State: peer.Task.FSM.Current(), + PeerCount: uint32(peer.Task.PeerCount()), + CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), } // Set digest to task response. @@ -410,21 +410,21 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c } resp := &commonv2.Task{ - Id: task.ID, - Type: task.Type, - Url: task.URL, - Tag: &task.Tag, - Application: &task.Application, - Filters: task.Filters, - RequestHeader: task.Header, - PieceLength: uint32(task.PieceLength), - ContentLength: uint64(task.ContentLength.Load()), - PieceCount: uint32(task.TotalPieceCount.Load()), - SizeScope: task.SizeScope(), - State: task.FSM.Current(), - PeerCount: uint32(task.PeerCount()), - CreatedAt: timestamppb.New(task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), + Id: task.ID, + Type: task.Type, + Url: task.URL, + Tag: &task.Tag, + Application: &task.Application, + FilteredQueryParams: task.FilteredQueryParams, + RequestHeader: task.Header, + PieceLength: uint32(task.PieceLength), + ContentLength: uint64(task.ContentLength.Load()), + PieceCount: uint32(task.TotalPieceCount.Load()), + SizeScope: task.SizeScope(), + State: task.FSM.Current(), + PeerCount: uint32(task.PeerCount()), + CreatedAt: timestamppb.New(task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), } // Set digest to response. @@ -1288,11 +1288,11 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An } task = resource.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(), - download.GetFilters(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) + download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) } else { task.URL = download.GetUrl() - task.Filters = download.GetFilters() + task.FilteredQueryParams = download.GetFilteredQueryParams() task.Header = download.GetRequestHeader() } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 53e96fa72ce..021311210fa 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -139,18 +139,18 @@ func TestServiceV2_StatPeer(t *testing.T) { Cost: durationpb.New(peer.Cost.Load()), State: peer.FSM.Current(), Task: &commonv2.Task{ - Id: peer.Task.ID, - Type: peer.Task.Type, - Url: peer.Task.URL, - Digest: &dgst, - Tag: &peer.Task.Tag, - Application: &peer.Task.Application, - Filters: peer.Task.Filters, - RequestHeader: peer.Task.Header, - PieceLength: uint32(peer.Task.PieceLength), - ContentLength: uint64(peer.Task.ContentLength.Load()), - PieceCount: uint32(peer.Task.TotalPieceCount.Load()), - SizeScope: peer.Task.SizeScope(), + Id: peer.Task.ID, + Type: peer.Task.Type, + Url: peer.Task.URL, + Digest: &dgst, + Tag: &peer.Task.Tag, + Application: &peer.Task.Application, + FilteredQueryParams: peer.Task.FilteredQueryParams, + RequestHeader: peer.Task.Header, + PieceLength: uint32(peer.Task.PieceLength), + ContentLength: uint64(peer.Task.ContentLength.Load()), + PieceCount: uint32(peer.Task.TotalPieceCount.Load()), + SizeScope: peer.Task.SizeScope(), Pieces: []*commonv2.Piece{ { Number: uint32(mockPiece.Number), @@ -250,7 +250,7 @@ func TestServiceV2_StatPeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -322,7 +322,7 @@ func TestServiceV2_LeavePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -365,18 +365,18 @@ func TestServiceV2_StatTask(t *testing.T) { assert := assert.New(t) assert.EqualValues(resp, &commonv2.Task{ - Id: task.ID, - Type: task.Type, - Url: task.URL, - Digest: &dgst, - Tag: &task.Tag, - Application: &task.Application, - Filters: task.Filters, - RequestHeader: task.Header, - PieceLength: uint32(task.PieceLength), - ContentLength: uint64(task.ContentLength.Load()), - PieceCount: uint32(task.TotalPieceCount.Load()), - SizeScope: task.SizeScope(), + Id: task.ID, + Type: task.Type, + Url: task.URL, + Digest: &dgst, + Tag: &task.Tag, + Application: &task.Application, + FilteredQueryParams: task.FilteredQueryParams, + RequestHeader: task.Header, + PieceLength: uint32(task.PieceLength), + ContentLength: uint64(task.ContentLength.Load()), + PieceCount: uint32(task.TotalPieceCount.Load()), + SizeScope: task.SizeScope(), Pieces: []*commonv2.Piece{ { Number: uint32(mockPiece.Number), @@ -408,7 +408,7 @@ func TestServiceV2_StatTask(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) taskManager := resource.NewMockTaskManager(ctl) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT()) @@ -919,7 +919,7 @@ func TestServiceV2_LeaveHost(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) @@ -1711,7 +1711,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) seedPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -1805,7 +1805,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -1898,7 +1898,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -1970,7 +1970,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2044,7 +2044,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2244,7 +2244,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { mockHost.IP = ip mockHost.DownloadPort = int32(port) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2317,7 +2317,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2437,7 +2437,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2596,7 +2596,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2722,7 +2722,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2833,7 +2833,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2899,7 +2899,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -2938,9 +2938,9 @@ func TestServiceV2_handleResource(t *testing.T) { { name: "task can be loaded", download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - RequestHeader: map[string]string{"baz": "bas"}, + Url: "foo", + FilteredQueryParams: []string{"bar"}, + RequestHeader: map[string]string{"baz": "bas"}, }, run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, @@ -2960,17 +2960,17 @@ func TestServiceV2_handleResource(t *testing.T) { assert.EqualValues(host, mockHost) assert.Equal(task.ID, mockTask.ID) assert.Equal(task.URL, download.Url) - assert.EqualValues(task.Filters, download.Filters) + assert.EqualValues(task.FilteredQueryParams, download.FilteredQueryParams) assert.EqualValues(task.Header, download.RequestHeader) }, }, { name: "task can not be loaded", download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - RequestHeader: map[string]string{"baz": "bas"}, - Digest: &dgst, + Url: "foo", + FilteredQueryParams: []string{"bar"}, + RequestHeader: map[string]string{"baz": "bas"}, + Digest: &dgst, }, run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, @@ -2993,7 +2993,7 @@ func TestServiceV2_handleResource(t *testing.T) { assert.Equal(task.ID, mockTask.ID) assert.Equal(task.Digest.String(), download.GetDigest()) assert.Equal(task.URL, download.GetUrl()) - assert.EqualValues(task.Filters, download.GetFilters()) + assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams()) assert.EqualValues(task.Header, download.RequestHeader) }, }, @@ -3020,10 +3020,10 @@ func TestServiceV2_handleResource(t *testing.T) { { name: "peer can be loaded", download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - RequestHeader: map[string]string{"baz": "bas"}, - Digest: &dgst, + Url: "foo", + FilteredQueryParams: []string{"bar"}, + RequestHeader: map[string]string{"baz": "bas"}, + Digest: &dgst, }, run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, @@ -3044,7 +3044,7 @@ func TestServiceV2_handleResource(t *testing.T) { assert.Equal(task.ID, mockTask.ID) assert.Equal(task.Digest.String(), download.GetDigest()) assert.Equal(task.URL, download.GetUrl()) - assert.EqualValues(task.Filters, download.GetFilters()) + assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams()) assert.EqualValues(task.Header, download.RequestHeader) assert.EqualValues(peer, mockPeer) }, @@ -3052,11 +3052,11 @@ func TestServiceV2_handleResource(t *testing.T) { { name: "peer can not be loaded", download: &commonv2.Download{ - Url: "foo", - Filters: []string{"bar"}, - RequestHeader: map[string]string{"baz": "bas"}, - Digest: &dgst, - Priority: commonv2.Priority_LEVEL1, + Url: "foo", + FilteredQueryParams: []string{"bar"}, + RequestHeader: map[string]string{"baz": "bas"}, + Digest: &dgst, + Priority: commonv2.Priority_LEVEL1, Range: &commonv2.Range{ Start: uint64(mockPeerRange.Start), Length: uint64(mockPeerRange.Length), @@ -3083,7 +3083,7 @@ func TestServiceV2_handleResource(t *testing.T) { assert.Equal(task.ID, mockTask.ID) assert.Equal(task.Digest.String(), download.GetDigest()) assert.Equal(task.URL, download.GetUrl()) - assert.EqualValues(task.Filters, download.GetFilters()) + assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams()) assert.EqualValues(task.Header, download.RequestHeader) assert.Equal(peer.ID, mockPeer.ID) assert.Equal(peer.Priority, download.Priority) @@ -3113,7 +3113,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) @@ -3390,7 +3390,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&tc.config, res, scheduling, dynconfig, storage, networkTopology)