-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjs8call.go
99 lines (86 loc) · 2.56 KB
/
js8call.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"bufio"
"encoding/json"
"fmt"
"net"
"time"
"github.com/PiotrTopa/js8web/model"
)
func readEventsFromJs8call(events chan<- model.Js8callEvent, disconnected chan<- int, reader *bufio.Reader) {
for {
var event model.Js8callEvent
jsonData, err := reader.ReadBytes('\n')
if err != nil {
logger.Sugar().Warnw("Cannot read from Js8Call",
"error", err,
)
disconnected <- 1
return
}
errJson := json.Unmarshal(jsonData, &event)
if errJson != nil {
logger.Sugar().Warnw("Cannot unmarshal JSON",
"json", jsonData,
"error", errJson,
)
} else {
events <- event
}
}
}
func writeEventsToJs8call(events <-chan model.Js8callEvent, disconnected chan<- int, writer *bufio.Writer) {
for event := range events {
jsonData, err := json.Marshal(event)
if err != nil {
logger.Sugar().Errorw("Cannot marshal JSON for event",
"event", event,
"error", err,
)
continue
}
writer.WriteString(string(jsonData) + "\n")
fmt.Print("Sending: ", string(jsonData)+"\n")
}
}
func attachEventStreamToJs8callConnection(incomingEvents chan<- model.Js8callEvent, outgoingEvents <-chan model.Js8callEvent, conn net.Conn) {
disconnected := make(chan int)
incomingJs8callEvents := make(chan model.Js8callEvent, 1)
outgoingJs8callEvents := make(chan model.Js8callEvent, 1)
defer close(incomingJs8callEvents)
defer close(outgoingJs8callEvents)
defer close(disconnected)
reader := bufio.NewReader(conn)
writer := bufio.NewWriter(conn)
go readEventsFromJs8call(incomingJs8callEvents, disconnected, reader)
go writeEventsToJs8call(outgoingJs8callEvents, disconnected, writer)
for {
select {
case <-disconnected:
return
case event := <-incomingJs8callEvents:
incomingEvents <- event
case event := <-outgoingEvents:
outgoingJs8callEvents <- event
}
}
}
func keepConnectedToJs8call(incomingEvents chan<- model.Js8callEvent, outgoingEvents <-chan model.Js8callEvent) {
for {
conn, err := net.Dial("tcp", JS8CALL_TCP_CONNECTION_STRING)
if err != nil {
logger.Sugar().Warnw("Connection to JS8call failed",
"address", JS8CALL_TCP_CONNECTION_STRING,
"error", err,
)
time.Sleep(time.Second * time.Duration(JS8CALL_TCP_CONNECTION_RETRY_SEC))
continue
}
logger.Sugar().Info("Connected to JS8call")
attachEventStreamToJs8callConnection(incomingEvents, outgoingEvents, conn)
logger.Sugar().Warn("Disconnected from JS8call")
}
}
func initJs8callConnection(incomingEvents chan<- model.Js8callEvent, outgoingEvents <-chan model.Js8callEvent) {
go keepConnectedToJs8call(incomingEvents, outgoingEvents)
}