Skip to content

Commit

Permalink
Merge branch 'dragonflyoss:main' into cache
Browse files Browse the repository at this point in the history
  • Loading branch information
MinH-09 authored Dec 14, 2023
2 parents 36fb6c9 + 839e476 commit 19b6c8e
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 9 deletions.
5 changes: 2 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (pt *peerTaskConductor) register() error {
regSpan.End()

if err != nil {
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
pt.Errorf("scheduler did not response in %s", pt.SchedulerOption.ScheduleTimeout.Duration)
}
pt.Errorf("step 1: peer %s register failed: %s", pt.request.PeerId, err)
Expand Down Expand Up @@ -800,8 +800,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool
failedReason string
)
// extract DfError for grpc status
err = dferrors.ConvertGRPCErrorToDfError(err)
de, ok := err.(*dferrors.DfError)
de, ok := dferrors.IsGRPCDfError(err)
if ok {
switch de.Code {
case commonv1.Code_SchedNeedBackSource:
Expand Down
6 changes: 6 additions & 0 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,13 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
URLMeta: urlMeta,
PeerID: ts.peerID,
})

assert.True(ok, "reuse stream task")
assert.NotNil(rc, "reuse stream task")
if rc == nil {
return
}

defer func() {
assert.Nil(rc.Close())
}()
Expand Down
15 changes: 12 additions & 3 deletions client/daemon/peer/peertask_piecetask_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/net/ip"
Expand Down Expand Up @@ -406,7 +407,6 @@ func (s *pieceTaskSynchronizer) receive() {
s.Errorf("synchronizer receives with error: %s", err)
s.error.Store(&pieceTaskSynchronizerError{err})
s.reportError(err)
s.Errorf("synchronizer receives with error: %s", err)
}
}

Expand All @@ -430,7 +430,16 @@ func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) erro

func (s *pieceTaskSynchronizer) reportError(err error) {
s.span.RecordError(err)
sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, commonv1.Code_ClientPieceRequestFail))
errCode := commonv1.Code_ClientPieceRequestFail

// extract DfError for grpc status
de, ok := dferrors.IsGRPCDfError(err)
if ok {
errCode = de.Code
s.Errorf("report error with convert code from grpc error, code: %d, message: %s", de.Code, de.Message)
}

sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, errCode))
if sendError != nil {
s.Errorf("sync piece info failed and send piece result with error: %s", sendError)
go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
Expand All @@ -440,7 +449,7 @@ func (s *pieceTaskSynchronizer) reportError(err error) {
}

func (s *pieceTaskSynchronizer) canceled(err error) bool {
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId)
return true
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.2
gorm.io/driver/postgres v1.5.4
gorm.io/driver/postgres v1.5.2
gorm.io/gorm v1.25.5
gorm.io/plugin/soft_delete v1.2.1
k8s.io/component-base v0.28.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1767,8 +1767,8 @@ gorm.io/driver/mysql v1.3.2/go.mod h1:ChK6AHbHgDCFZyJp0F+BmVGb06PSIoh9uVYKAlRbb2
gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
gorm.io/driver/postgres v1.2.2/go.mod h1:Ik3tK+a3FMp8ORZl29v4b3M0RsgXsaeMXh9s9eVMXco=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0=
gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8=
gorm.io/driver/sqlite v1.1.3 h1:BYfdVuZB5He/u9dt4qDpZqiqDJ6KhPqs5QUqsr/Eeuc=
gorm.io/driver/sqlite v1.1.3/go.mod h1:AKDgRWk8lcSQSw+9kxCJnX/yySj8G3rdwYlU57cB45c=
gorm.io/driver/sqlserver v1.2.1/go.mod h1:nixq0OB3iLXZDiPv6JSOjWuPgpyaRpOIIevYtA4Ulb4=
Expand Down
20 changes: 20 additions & 0 deletions internal/dferrors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ func ConvertGRPCErrorToDfError(err error) error {
return err
}

func IsGRPCDfError(err error) (*DfError, bool) {
for _, d := range status.Convert(err).Details() {
switch internal := d.(type) {
case *commonv1.GrpcDfError:
return &DfError{
Code: internal.Code,
Message: internal.Message,
}, true
}
}

var de *DfError
ok := errors.As(err, &de)
if ok {
return de, true
}

return nil, false
}

// ConvertDfErrorToGRPCError converts DfError to grpc error, if it is.
func ConvertDfErrorToGRPCError(err error) error {
if v, ok := err.(*DfError); ok {
Expand Down

0 comments on commit 19b6c8e

Please sign in to comment.