-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_codec.go
120 lines (112 loc) · 3.04 KB
/
message_codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package diskq
import (
"fmt"
"io"
"time"
)
// Encode writes a message out into a writer.
func Encode(m Message, wr io.Writer) (err error) {
partitionKeyBytes := []byte(m.PartitionKey)
partitionKeyLen := uint32(len(partitionKeyBytes))
partitionKeyLenData := []byte{
byte(partitionKeyLen),
byte(partitionKeyLen >> 8),
byte(partitionKeyLen >> 16),
byte(partitionKeyLen >> 24),
}
_, err = wr.Write(partitionKeyLenData)
if err != nil {
return
}
_, err = wr.Write(partitionKeyBytes)
if err != nil {
return
}
timestampNanos := m.TimestampUTC.UnixNano()
timestampNanosData := []byte{
byte(timestampNanos),
byte(timestampNanos >> 8),
byte(timestampNanos >> 16),
byte(timestampNanos >> 24),
byte(timestampNanos >> 32),
byte(timestampNanos >> 40),
byte(timestampNanos >> 48),
byte(timestampNanos >> 56),
}
_, err = wr.Write(timestampNanosData)
if err != nil {
return
}
dataLen := uint64(len(m.Data))
dataLenData := []byte{
byte(dataLen),
byte(dataLen >> 8),
byte(dataLen >> 16),
byte(dataLen >> 24),
byte(dataLen >> 32),
byte(dataLen >> 40),
byte(dataLen >> 48),
byte(dataLen >> 56),
}
_, err = wr.Write(dataLenData)
if err != nil {
return
}
_, err = wr.Write(m.Data)
return
}
// Decode reads a message out of the reader.
func Decode(m *Message, r io.Reader) (err error) {
partitionKeySizeBytesData := make([]byte, 4)
_, err = r.Read(partitionKeySizeBytesData)
if err != nil {
err = fmt.Errorf("decode; cannot read partition key size: %w", err)
return
}
partitionKeySizeBytes := uint32(partitionKeySizeBytesData[0]) |
uint32(partitionKeySizeBytesData[1])<<8 |
uint32(partitionKeySizeBytesData[2])<<16 |
uint32(partitionKeySizeBytesData[3])<<24
partitionKeyData := make([]byte, partitionKeySizeBytes)
_, err = r.Read(partitionKeyData)
if err != nil {
err = fmt.Errorf("decode; cannot read partition key: %w", err)
return
}
m.PartitionKey = string(partitionKeyData)
timestampNanosBytes := make([]byte, 8)
_, err = r.Read(timestampNanosBytes)
if err != nil {
err = fmt.Errorf("decode; cannot read timestamp: %w", err)
return
}
timestampNanos := int64(timestampNanosBytes[0]) |
int64(timestampNanosBytes[1])<<8 |
int64(timestampNanosBytes[2])<<16 |
int64(timestampNanosBytes[3])<<24 |
int64(timestampNanosBytes[4])<<32 |
int64(timestampNanosBytes[5])<<40 |
int64(timestampNanosBytes[6])<<48 |
int64(timestampNanosBytes[7])<<56
m.TimestampUTC = time.Unix(0, timestampNanos).UTC()
dataSizeBytesData := make([]byte, 8)
_, err = r.Read(dataSizeBytesData)
if err != nil {
err = fmt.Errorf("decode; cannot read data size: %w", err)
return
}
dataSizeBytes := uint64(dataSizeBytesData[0]) |
uint64(dataSizeBytesData[1])<<8 |
uint64(dataSizeBytesData[2])<<16 |
uint64(dataSizeBytesData[3])<<24 |
uint64(dataSizeBytesData[4])<<32 |
uint64(dataSizeBytesData[5])<<40 |
uint64(dataSizeBytesData[6])<<48 |
uint64(dataSizeBytesData[7])<<56
m.Data = make([]byte, dataSizeBytes)
_, err = r.Read(m.Data)
if err != nil {
err = fmt.Errorf("decode; cannot read data: %w", err)
}
return
}