-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathconnection.go
140 lines (120 loc) · 2.52 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package tarantool
import (
"net"
"fmt"
"github.com/vmihailenco/msgpack"
"sync/atomic"
"bytes"
"sync"
)
type Connection struct {
connection net.Conn
mutex *sync.Mutex
requestId uint32
Greeting *Greeting
requests map[uint32]chan *Response
packets chan []byte
}
type Greeting struct {
version string
auth string
}
func Connect(addr string) (conn *Connection, err error) {
fmt.Printf("Connecting to %s ...\n", addr)
connection, err := net.Dial("tcp", addr)
if err != nil {
return
}
connection.(*net.TCPConn).SetNoDelay(true)
fmt.Println("Connected ...")
conn = &Connection{ connection, &sync.Mutex{}, 0, &Greeting{}, make(map[uint32]chan *Response), make(chan []byte) }
err = conn.handShake()
go conn.writer()
go conn.reader()
return
}
func (conn *Connection) handShake() (err error) {
fmt.Printf("Greeting ... ")
greeting := make([]byte, 128)
_, err = conn.connection.Read(greeting)
if err != nil {
fmt.Println("Error")
return
}
conn.Greeting.version = bytes.NewBuffer(greeting[:64]).String()
conn.Greeting.auth = bytes.NewBuffer(greeting[64:]).String()
fmt.Println("Success")
fmt.Println("Version:", conn.Greeting.version)
return
}
func (conn *Connection) writer(){
var (
err error
packet []byte
)
for {
packet = <- conn.packets
err = conn.write(packet)
if err != nil {
panic(err)
}
}
}
func (conn *Connection) reader() {
var (
err error
resp_bytes []byte
)
for {
resp_bytes, err = conn.read()
if err != nil {
panic(err)
}
resp := NewResponse(resp_bytes)
respChan := conn.requests[resp.RequestId]
conn.mutex.Lock()
delete(conn.requests, resp.RequestId)
conn.mutex.Unlock()
respChan <- resp
}
}
func (conn *Connection) write(data []byte) (err error) {
l, err := conn.connection.Write(data)
if l != len(data) {
panic("Wrong length writed")
}
return
}
func (conn *Connection) read() (response []byte, err error){
var length_uint uint32
var l, tl int
length := make([]byte, PacketLengthBytes)
tl = 0
for tl < int(PacketLengthBytes) {
l, err = conn.connection.Read(length[tl:])
tl += l
if err != nil {
return
}
}
err = msgpack.Unmarshal(length, &length_uint)
if err != nil {
return
}
response = make([]byte, length_uint)
if(length_uint > 0){
tl = 0
for tl < int(length_uint) {
l, err = conn.connection.Read(response[tl:])
tl += l
if err != nil {
return
}
}
}
return
}
func (conn *Connection) nextRequestId() (requestId uint32) {
conn.requestId = atomic.AddUint32(&conn.requestId, 1)
return conn.requestId
}