Skip to content

Commit a69806b

Browse files
authoredJul 28, 2021
Merge pull request #14 from HRX980829/master
Fix cross-language seqId problem
2 parents 59d5b1b + 8ef70f4 commit a69806b

File tree

5 files changed

+100
-1
lines changed

5 files changed

+100
-1
lines changed
 

‎replayer-agent/logic/match/match.go

+6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"github.com/didi/sharingan/replayer-agent/model/protocol"
1414
"github.com/didi/sharingan/replayer-agent/model/recording"
1515
"github.com/didi/sharingan/replayer-agent/model/replaying"
16+
"github.com/didi/sharingan/replayer-agent/utils/fastcgi"
1617
"github.com/didi/sharingan/replayer-agent/utils/helper"
18+
"github.com/didi/sharingan/replayer-agent/utils/protocol/pthrift"
1719
)
1820

1921
var expect100 = []byte("Expect: 100-continue")
@@ -126,6 +128,10 @@ func (m *Matcher) DoMatchOutboundTalk(
126128
m.MaxMatchedIndex = maxScoreIndex
127129
}
128130
m.Visited[maxScoreIndex] = true
131+
132+
if bytes.HasPrefix(session.CallFromInbound.Raw, fastcgi.FastCGIRequestHeader) {
133+
pthrift.ReplaceSequenceId(request, session.CallOutbounds[maxScoreIndex])
134+
}
129135
return maxScoreIndex, mark, session.CallOutbounds[maxScoreIndex]
130136
}
131137

‎replayer-agent/logic/outbound/connection.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ func (cs *ConnState) readRequest(ctx context.Context) ([]byte, error) {
107107

108108
request := pool.GetBuf(81920, true)
109109

110+
// SetReadDeadline sets the deadline for future Read calls
111+
// and any currently-blocked Read call.
112+
// A zero value for t means Read will not time out.
113+
cs.conn.SetReadDeadline(time.Time{})
114+
110115
bytesRead, err := cs.conn.Read(buf)
111116
if err != nil {
112117
return nil, err
@@ -153,9 +158,11 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error {
153158
cs.Handler = loadHandler(ctx, string(cs.traceID))
154159
if cs.Handler == nil {
155160
tlog.Handler.Warnf(ctx, tlog.DebugTag, "errmsg=find Handler failed||request=%s||traceID=%s", quotedRequest, string(cs.traceID))
156-
return errors.New("find Handler failed")
161+
return nil
157162
}
158163

164+
ctx = cs.Handler.Ctx
165+
159166
// 去掉COM_STMT_CLOSE
160167
if request = removeMysqlStmtClose(request); len(request) == 0 {
161168
return nil

‎replayer-agent/utils/protocol/helper/helper.go

+23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package helper
22

33
import (
4+
"bytes"
5+
"encoding/binary"
46
"encoding/json"
57
"fmt"
68
"strconv"
@@ -43,3 +45,24 @@ func convertModelMap2GeneralMap(m model.Map) map[string]interface{} {
4345
}
4446
return result
4547
}
48+
49+
func IntToBytes(n int,b byte) ([]byte,error) {
50+
switch b {
51+
case 1:
52+
tmp := int8(n)
53+
bytesBuffer := bytes.NewBuffer([]byte{})
54+
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
55+
return bytesBuffer.Bytes(),nil
56+
case 2:
57+
tmp := int16(n)
58+
bytesBuffer := bytes.NewBuffer([]byte{})
59+
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
60+
return bytesBuffer.Bytes(),nil
61+
case 3,4:
62+
tmp := int32(n)
63+
bytesBuffer := bytes.NewBuffer([]byte{})
64+
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
65+
return bytesBuffer.Bytes(),nil
66+
}
67+
return nil,fmt.Errorf("IntToBytes b param is invaild")
68+
}

‎replayer-agent/utils/protocol/pthrift/packet.go

+30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"errors"
66

7+
"github.com/didi/sharingan/replayer-agent/model/recording"
8+
"github.com/didi/sharingan/replayer-agent/utils/protocol/helper"
79
"github.com/modern-go/parse"
810
"github.com/modern-go/parse/model"
911
)
@@ -407,3 +409,31 @@ func decodeFieldValueCompact(src *parse.Source, vType CompactKind) (interface{},
407409
return nil, errInvalidFeildType
408410
}
409411
}
412+
413+
func ReplaceSequenceId(request []byte, callOutbound *recording.CallOutbound) {
414+
if len(request) <= 4 {
415+
return
416+
}
417+
thrift, DecErr := DecodeBinary(request[4:])
418+
if DecErr != nil {
419+
return
420+
}
421+
422+
seqId := thrift["sequence_id"]
423+
seqIdBytes ,err := helper.IntToBytes(seqId.(int), 4)
424+
if err != nil {
425+
return
426+
}
427+
428+
name := thrift["name"].(string)
429+
nameBytes := []byte(name)
430+
begin := bytes.LastIndex(callOutbound.Response, nameBytes)
431+
if begin == -1 {
432+
return
433+
}
434+
435+
begin += len(nameBytes)
436+
for i, v := range seqIdBytes {
437+
callOutbound.Response[begin+i] = v
438+
}
439+
}

‎replayer-agent/utils/protocol/pthrift/packet_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pthrift
22

33
import (
44
"bytes"
5+
"reflect"
56
"testing"
67

78
"github.com/modern-go/parse"
@@ -264,3 +265,35 @@ func TestDecodeCompact(t *testing.T) {
264265
should.Equal(tc.expect, actual, "case #%d fail", idx)
265266
}
266267
}
268+
269+
func TestIntToBytes(t *testing.T) {
270+
type args struct {
271+
n int
272+
b byte
273+
}
274+
tests := []struct {
275+
name string
276+
args args
277+
want []byte
278+
wantErr bool
279+
}{
280+
{
281+
name: "1",
282+
args: args{n: 10, b: 4},
283+
want: []byte{0, 0, 0, 0x0a},
284+
wantErr: false,
285+
},
286+
}
287+
for _, tt := range tests {
288+
t.Run(tt.name, func(t *testing.T) {
289+
got, err := IntToBytes(tt.args.n, tt.args.b)
290+
if (err != nil) != tt.wantErr {
291+
t.Errorf("IntToBytes() error = %v, wantErr %v", err, tt.wantErr)
292+
return
293+
}
294+
if !reflect.DeepEqual(got, tt.want) {
295+
t.Errorf("IntToBytes() got = %v, want %v", got, tt.want)
296+
}
297+
})
298+
}
299+
}

0 commit comments

Comments
 (0)
Please sign in to comment.