Skip to content

Commit

Permalink
feat: replace filters with filteredQueryParams (#3049)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 24, 2024
1 parent 17d111e commit 0b1157b
Show file tree
Hide file tree
Showing 27 changed files with 350 additions and 350 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.84
d7y.io/api/v2 v2.0.85
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand All @@ -24,6 +24,7 @@ require (
github.com/docker/go-units v0.4.0
github.com/gaius-qi/ping v1.0.0
github.com/gammazero/deque v0.2.1
github.com/gin-contrib/gzip v0.0.6
github.com/gin-contrib/static v0.0.1
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1
Expand Down Expand Up @@ -127,7 +128,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/gzip v0.0.6 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.84 h1:V4lpcsMccllHH4/yUgWYcEdrl8XgZoijX92Tp2as0e0=
d7y.io/api/v2 v2.0.84/go.mod h1:nBK3pWGNkbZTI49Rt9oV4KeGtELO2m2puDgltVpIRg4=
d7y.io/api/v2 v2.0.85 h1:20phOlkxI/f0kgg+RC60N3F2AXmAzlDaZTs7aUrxgyg=
d7y.io/api/v2 v2.0.85/go.mod h1:nBK3pWGNkbZTI49Rt9oV4KeGtELO2m2puDgltVpIRg4=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
Expand Down
14 changes: 7 additions & 7 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package job

type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
Filter string `json:"filter" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
}

type PreheatResponse struct {
Expand Down
16 changes: 8 additions & 8 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
case PreheatFileType:
files = []internaljob.PreheatRequest{
{
URL: json.URL,
Tag: json.Tag,
Filter: json.Filter,
Headers: json.Headers,
URL: json.URL,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
},
}
default:
Expand Down Expand Up @@ -301,10 +301,10 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
for _, v := range m.References() {
header.Set("Accept", v.MediaType)
layer := internaljob.PreheatRequest{
URL: image.blobsURL(v.Digest.String()),
Tag: args.Tag,
Filter: args.Filter,
Headers: nethttp.HeaderToMap(header),
URL: image.blobsURL(v.Digest.String()),
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
}

layers = append(layers, layer)
Expand Down
8 changes: 4 additions & 4 deletions manager/service/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (s *service) CreateV1Preheat(ctx context.Context, json types.CreateV1Prehea
job, err := s.CreatePreheatJob(ctx, types.CreatePreheatJobRequest{
Type: internaljob.PreheatJob,
Args: types.PreheatArgs{
Type: json.Type,
URL: json.URL,
Filter: json.Filter,
Headers: json.Headers,
Type: json.Type,
URL: json.URL,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type PreheatArgs struct {
// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`

// Filter is filter for preheating.
Filter string `json:"filter" binding:"omitempty"`
// FilteredQueryParams is the filtered query params for preheating.
FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"`

// Headers is the http headers for authentication.
Headers map[string]string `json:"headers" binding:"omitempty"`
Expand Down
8 changes: 4 additions & 4 deletions manager/types/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package types

type CreateV1PreheatRequest struct {
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
Filter string `json:"filter" binding:"omitempty"`
Headers map[string]string `json:"headers" binding:"omitempty"`
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"`
Headers map[string]string `json:"headers" binding:"omitempty"`
}

type CreateV1PreheatResponse struct {
Expand Down
20 changes: 10 additions & 10 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

const (
// URLFilterSeparator is filter separator for url.
URLFilterSeparator = "&"
// FilteredQueryParamsSeparator is the separator of filtered query params.
FilteredQueryParamsSeparator = "&"
)

// TaskIDV1 generates v1 version of task id.
Expand All @@ -51,13 +51,13 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(url)
}

filters := parseFilters(meta.Filter)
filteredQueryParams := parseFilteredQueryParams(meta.Filter)

var (
u string
err error
)
u, err = neturl.FilterQuery(url, filters)
u, err = neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
u = ""
}
Expand All @@ -82,18 +82,18 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(data...)
}

// parseFilters parses a filter string to filter slice.
func parseFilters(rawFilters string) []string {
if pkgstrings.IsBlank(rawFilters) {
// parseFilteredQueryParams parses filtered query params.
func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
if pkgstrings.IsBlank(rawFilteredQueryParams) {
return nil
}

return strings.Split(rawFilters, URLFilterSeparator)
return strings.Split(rawFilteredQueryParams, FilteredQueryParamsSeparator)
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, digest, tag, application string, pieceLength int32, filters []string) string {
url, err := neturl.FilterQuery(url, filters)
func TaskIDV2(url, digest, tag, application string, pieceLength int32, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/net/url/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"net/url"
)

// FilterQuery excludes query string in url with filters.
func FilterQuery(rawURL string, filters []string) (string, error) {
if len(filters) == 0 {
// FilterQueryParams filters the query params in the url.
func FilterQueryParams(rawURL string, filteredQueryParams []string) (string, error) {
if len(filteredQueryParams) == 0 {
return rawURL, nil
}

Expand All @@ -32,7 +32,7 @@ func FilterQuery(rawURL string, filters []string) (string, error) {
}

hidden := make(map[string]struct{})
for _, filter := range filters {
for _, filter := range filteredQueryParams {
hidden[filter] = struct{}{}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/net/url/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (
)

func TestFilterQuery(t *testing.T) {
url, err := FilterQuery("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{"x", "m"})
url, err := FilterQueryParams("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{"x", "m"})
assert.Nil(t, err)
assert.Equal(t, "http://www.xx.yy/path?u=f#size", url)

url, err = FilterQuery("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{})
url, err = FilterQueryParams("http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", []string{})
assert.Nil(t, err)
assert.Equal(t, "http://www.xx.yy/path?u=f&x=y&m=z&x=s#size", url)

url, err = FilterQuery(":error_url", []string{"x", "m"})
url, err = FilterQueryParams(":error_url", []string{"x", "m"})
assert.NotNil(t, err)
assert.Equal(t, "", url)
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (j *job) preheat(ctx context.Context, req string) error {
urlMeta := &commonv1.UrlMeta{
Digest: preheat.Digest,
Tag: preheat.Tag,
Filter: preheat.Filter,
Filter: preheat.FilteredQueryParams,
Header: preheat.Headers,
Application: preheat.Application,
Priority: commonv1.Priority(preheat.Priority),
Expand All @@ -192,7 +192,7 @@ func (j *job) preheat(ctx context.Context, req string) error {
// Trigger seed peer download seeds.
taskID := idgen.TaskIDV1(preheat.URL, urlMeta)
log := logger.WithTask(taskID, preheat.URL)
log.Infof("preheat %s tag: %s, range: %s, filter: %s, digest: %s",
log.Infof("preheat %s tag: %s, range: %s, filtered query params: %s, digest: %s",
preheat.URL, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
log.Debugf("preheat %s headers: %#v", preheat.URL, urlMeta.Header)

Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func TestHostManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestHost_LoadPeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestHost_StorePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(tc.peerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestHost_DeletePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -717,7 +717,7 @@ func TestHost_LeavePeers(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

tc.expect(t, host, mockPeer)
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestHost_FreeUploadCount(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

tc.expect(t, host, mockTask, mockPeer)
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestPeerManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(tc.gcConfig, gc)
if err != nil {
Expand Down
Loading

0 comments on commit 0b1157b

Please sign in to comment.