-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathframe.go
157 lines (126 loc) · 3.47 KB
/
frame.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package nsq
import (
"bufio"
"encoding/binary"
"io"
"strconv"
"github.com/pkg/errors"
)
// FrameType is used to represent the different types of frames that a consumer
// may receive.
type FrameType int
const (
// FrameTypeResponse is the code for frames that carry success responses to
// commands.
FrameTypeResponse FrameType = 0
// FrameTypeError is the code for frames that carry error responses to
// commands.
FrameTypeError FrameType = 1
// FrameTypeMessage is the code for frames that carry messages.
FrameTypeMessage FrameType = 2
)
// String returns a human-readable representation of the frame type.
func (t FrameType) String() string {
switch t {
case FrameTypeResponse:
return "response"
case FrameTypeError:
return "error"
case FrameTypeMessage:
return "message"
default:
return "frame <" + strconv.Itoa(int(t)) + ">"
}
}
// The Frame interface is implemented by types that represent the different
// types of frames that a consumer may receive.
type Frame interface {
// FrameType returns the code representing the frame type.
FrameType() FrameType
// Write serializes the frame to the given buffered output.
Write(*bufio.Writer) error
}
// ReadFrame reads a frame from the buffer input r, returning it or an error if
// something went wrong.
//
// if frame, err := ReadFrame(r); err != nil {
// // handle the error
// ...
// } else {
// switch f := frame.(type) {
// case nsq.Message:
// ...
// }
// }
//
func ReadFrame(r *bufio.Reader) (frame Frame, err error) {
var size int32
var ftype int32
if err = binary.Read(r, binary.BigEndian, &size); err != nil {
err = errors.Wrap(err, "reading frame size")
return
}
if err = binary.Read(r, binary.BigEndian, &ftype); err != nil {
err = errors.Wrap(err, "reading frame type")
return
}
switch size -= 4; FrameType(ftype) {
case FrameTypeResponse:
return readResponse(int(size), r)
case FrameTypeError:
return readError(int(size), r)
case FrameTypeMessage:
return readMessage(int(size), r)
default:
return readUnknownFrame(FrameType(ftype), int(size), r)
}
}
// UnknownFrame is used to represent frames of unknown types for which the
// library has no special implementation.
type UnknownFrame struct {
// Type is the type of the frame.
Type FrameType
// Data contains the raw data of the frame.
Data []byte
}
// FrameType returns the code representing the frame type, satisfies the Frame
// interface.
func (f UnknownFrame) FrameType() FrameType {
return f.Type
}
// Write serializes the frame to the given buffered output, satisfies the Frame
// interface.
func (f UnknownFrame) Write(w *bufio.Writer) (err error) {
if err = writeFrameHeader(w, f.Type, len(f.Data)); err != nil {
err = errors.WithMessage(err, "writing unknown frame")
return
}
if _, err = w.Write(f.Data); err != nil {
err = errors.Wrap(err, "writing unknown frame")
return
}
return
}
func readUnknownFrame(t FrameType, n int, r *bufio.Reader) (f UnknownFrame, err error) {
b := make([]byte, n)
if _, err = io.ReadFull(r, b); err != nil {
err = errors.Wrap(err, "reading unknown frame")
return
}
f = UnknownFrame{
Type: t,
Data: b,
}
return
}
func writeFrameHeader(w *bufio.Writer, ftype FrameType, size int) (err error) {
if err = binary.Write(w, binary.BigEndian, int32(size)+4); err != nil {
err = errors.Wrap(err, "writing frame header")
return
}
if err = binary.Write(w, binary.BigEndian, int32(ftype)); err != nil {
err = errors.Wrap(err, "writing frame header")
return
}
return
}