diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 33ca897c403..e2686c5b55f 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -341,6 +341,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask IsMigrating: false, } + taskID := idgen.TaskIDV1(req.URL, req.URLMeta) if ptm.Multiplex { // try breakpoint resume for task has range header if req.Range != nil && !ptm.SplitRunningTasks { @@ -357,14 +358,14 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask } // reuse by completed task - r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req) + r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, taskID, req) if ok { metrics.PeerTaskCacheHitCount.Add(1) return r, attr, nil } } - pt, err := ptm.newStreamTask(ctx, peerTaskRequest, req.Range) + pt, err := ptm.newStreamTask(ctx, taskID, peerTaskRequest, req.Range) if err != nil { return nil, nil, err } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index fc1091f849b..ed35068b892 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -910,6 +910,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * // test reuse stream task rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), + taskID, &StreamTaskRequest{ URL: ts.url, URLMeta: urlMeta, diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 8504a829c05..26a254dd237 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -225,9 +225,8 @@ func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileT return nil } -func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, +func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, taskID string, request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) { - taskID := idgen.TaskIDV1(request.URL, request.URLMeta) var ( reuse *storage.ReusePeerTask reuseRange *http.Range // the range of parent peer task data to read diff --git a/client/daemon/peer/peertask_reuse_test.go b/client/daemon/peer/peertask_reuse_test.go index c9136a00268..10b85d3d9ea 100644 --- a/client/daemon/peer/peertask_reuse_test.go +++ b/client/daemon/peer/peertask_reuse_test.go @@ -37,6 +37,7 @@ import ( "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/client/daemon/storage/mocks" "d7y.io/dragonfly/v2/client/daemon/test" + "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" ) @@ -713,7 +714,9 @@ func TestReuseStreamPeerTask(t *testing.T) { }, }, } - tc.verify(ptm.tryReuseStreamPeerTask(context.Background(), tc.request)) + + taskID := idgen.TaskIDV1(tc.request.URL, tc.request.URLMeta) + tc.verify(ptm.tryReuseStreamPeerTask(context.Background(), taskID, tc.request)) }) } } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index fc07597acf1..666a0a6ae1d 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -33,7 +33,6 @@ import ( "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" - "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" ) @@ -73,6 +72,7 @@ type resumeStreamTask struct { func (ptm *peerTaskManager) newStreamTask( ctx context.Context, + taskID string, request *schedulerv1.PeerTaskRequest, rg *http.Range) (*streamTask, error) { metrics.StreamTaskCount.Add(1) @@ -87,7 +87,6 @@ func (ptm *peerTaskManager) newStreamTask( parent = ptm.prefetchParentTask(request, "") } - taskID := idgen.TaskIDV1(request.Url, request.UrlMeta) ptc, err := ptm.getPeerTaskConductor(ctx, taskID, request, limit, parent, rg, "", false) if err != nil { return nil, err diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index e0b119ce07c..3b194b5f148 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -317,7 +317,7 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) { PeerHost: &schedulerv1.PeerHost{}, } ctx := context.Background() - pt, err := ptm.newStreamTask(ctx, req, nil) + pt, err := ptm.newStreamTask(ctx, taskID, req, nil) assert.Nil(err, "new stream peer task") rc, _, err := pt.Start(ctx) diff --git a/client/daemon/peer/peertask_stream_resume_test.go b/client/daemon/peer/peertask_stream_resume_test.go index 8306cb03593..3d90077abbe 100644 --- a/client/daemon/peer/peertask_stream_resume_test.go +++ b/client/daemon/peer/peertask_stream_resume_test.go @@ -215,7 +215,8 @@ func TestStreamPeerTask_Resume(t *testing.T) { // set up parent task wg.Add(1) - pt, err := ptm.newStreamTask(ctx, req, nil) + + pt, err := ptm.newStreamTask(ctx, taskID, req, nil) assert.Nil(err, "new parent stream peer task") rc, _, err := pt.Start(ctx)