Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dragonflyoss/Dragonfly2 into fix/he…
Browse files Browse the repository at this point in the history
…alcheck

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Dec 15, 2023
2 parents f15da08 + 839e476 commit 89323da
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Golangci lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.52.2
version: v1.54
args: --verbose

- name: Markdown lint
Expand Down
5 changes: 2 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (pt *peerTaskConductor) register() error {
regSpan.End()

if err != nil {
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
pt.Errorf("scheduler did not response in %s", pt.SchedulerOption.ScheduleTimeout.Duration)
}
pt.Errorf("step 1: peer %s register failed: %s", pt.request.PeerId, err)
Expand Down Expand Up @@ -800,8 +800,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool
failedReason string
)
// extract DfError for grpc status
err = dferrors.ConvertGRPCErrorToDfError(err)
de, ok := err.(*dferrors.DfError)
de, ok := dferrors.IsGRPCDfError(err)
if ok {
switch de.Code {
case commonv1.Code_SchedNeedBackSource:
Expand Down
6 changes: 6 additions & 0 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,13 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
URLMeta: urlMeta,
PeerID: ts.peerID,
})

assert.True(ok, "reuse stream task")
assert.NotNil(rc, "reuse stream task")
if rc == nil {
return
}

defer func() {
assert.Nil(rc.Close())
}()
Expand Down
15 changes: 12 additions & 3 deletions client/daemon/peer/peertask_piecetask_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/net/ip"
Expand Down Expand Up @@ -406,7 +407,6 @@ func (s *pieceTaskSynchronizer) receive() {
s.Errorf("synchronizer receives with error: %s", err)
s.error.Store(&pieceTaskSynchronizerError{err})
s.reportError(err)
s.Errorf("synchronizer receives with error: %s", err)
}
}

Expand All @@ -430,7 +430,16 @@ func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) erro

func (s *pieceTaskSynchronizer) reportError(err error) {
s.span.RecordError(err)
sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, commonv1.Code_ClientPieceRequestFail))
errCode := commonv1.Code_ClientPieceRequestFail

// extract DfError for grpc status
de, ok := dferrors.IsGRPCDfError(err)
if ok {
errCode = de.Code
s.Errorf("report error with convert code from grpc error, code: %d, message: %s", de.Code, de.Message)
}

sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, errCode))
if sendError != nil {
s.Errorf("sync piece info failed and send piece result with error: %s", sendError)
go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
Expand All @@ -440,7 +449,7 @@ func (s *pieceTaskSynchronizer) reportError(err error) {
}

func (s *pieceTaskSynchronizer) canceled(err error) bool {
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId)
return true
}
Expand Down
20 changes: 20 additions & 0 deletions internal/dferrors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ func ConvertGRPCErrorToDfError(err error) error {
return err
}

func IsGRPCDfError(err error) (*DfError, bool) {
for _, d := range status.Convert(err).Details() {
switch internal := d.(type) {
case *commonv1.GrpcDfError:
return &DfError{
Code: internal.Code,
Message: internal.Message,
}, true
}
}

var de *DfError
ok := errors.As(err, &de)
if ok {
return de, true
}

return nil, false
}

// ConvertDfErrorToGRPCError converts DfError to grpc error, if it is.
func ConvertDfErrorToGRPCError(err error) error {
if v, ok := err.(*DfError); ok {
Expand Down

0 comments on commit 89323da

Please sign in to comment.