-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
219 lines (187 loc) · 5.78 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Create and maintain by Chaiyapong Lapliengtrakul (chaiyapong@3dsinteractive.com), All right reserved (2021 - Present)
package main
import (
"encoding/json"
"net/http"
"strings"
"time"
)
func main() {
cfg := NewConfig()
ms := NewMicroservice()
ms.RegisterLivenessProbeEndpoint("/healthz")
serviceID := cfg.ServiceID()
switch serviceID {
case "register-api":
startRegisterAPI(ms, cfg)
case "mail-consumer":
startMailConsumer(ms, cfg)
case "batch-scheduler":
startBatchScheduler(ms, cfg)
case "batch-ptask-api":
startBatchPTaskAPI(ms, cfg)
case "batch-ptask-worker":
startBatchPTaskWorkerNode(ms, cfg)
case "external-api":
start3rdPartyMockAPI(ms, cfg)
}
ms.Start()
}
func startRegisterAPI(ms *Microservice, cfg IConfig) {
ms.AsyncPOST("/api/citizen", cfg.CacheServer(), cfg.MQServers(), func(ctx IContext) error {
// 1. Read Input (Not using it right now, just for example)
input := ctx.ReadInput()
ctx.Log("POST: /api/citizen " + input)
// 2. Generate citizenID and send it to MQ
// The citizen id should be received from client, but for code to be easy to read, we just create it
// To be able to test, instead of call randString() directly
// we inject Random struct to IRandom argument
// citizenID := randString()
rnd := NewRandom()
return onPostClient(ctx, cfg, rnd)
})
}
// This onPostClient accept all arguments as interface, so we can mock and test all logic using unit test
func onPostClient(ctx IContext, cfg IConfig, rnd IRandom) error {
citizenID := rnd.Random()
startWithZero := strings.HasPrefix(citizenID, "0")
if !startWithZero && len(citizenID) > 0 {
citizen := map[string]interface{}{
"citizen_id": citizenID,
}
prod := ctx.Producer(cfg.MQServers())
err := prod.SendMessage(cfg.CitizenRegisteredTopic(), "", citizen)
if err != nil {
ctx.Log(err.Error())
return err
}
status := map[string]interface{}{
"status": "success",
"citizen_id": citizenID,
}
ctx.Response(http.StatusOK, status)
} else {
status := map[string]interface{}{
"status": "failed",
}
ctx.Response(http.StatusOK, status)
}
return nil
}
func startMailConsumer(ms *Microservice, cfg IConfig) {
topic := cfg.CitizenRegisteredTopic()
groupID := "mail-consumer"
timeout := time.Duration(-1)
// 1. Create topic "citizen registered" if not exists
mq := NewMQ(cfg.MQServers(), ms)
mq.CreateTopicR(topic, 5, 1, time.Hour*24*30)
// 2. Start consumer to consume message from "citizen registered" topic
ms.Consume(cfg.MQServers(), topic, groupID, timeout, func(ctx IContext) error {
msg := ctx.ReadInput()
// 3. Parse input to citizen object
citizen := &Citizen{}
err := json.Unmarshal([]byte(msg), &citizen)
if err != nil {
ctx.Log(err.Error())
return err
}
// 4. Call validation API (Response AVG at 1 second, so we set timeout at 5 seconds)
req := ctx.Requester("", 5*time.Second)
validationResStr, err := req.Post(cfg.CitizenValidationAPI(),
map[string]string{"citizen_id": citizen.CitizenID})
if err != nil {
ctx.Log(err.Error())
return err
}
validationRes := map[string]interface{}{}
err = json.Unmarshal([]byte(validationResStr), &validationRes)
if err != nil {
ctx.Log(err.Error())
return err
}
isValid, _ := validationRes["status"]
if isValid != "ok" {
// 5. Send Email to citizen to reject register if validation is not OK
// We just log to console, but for the real code, this should send the email
ctx.Log("Mail rejection has sent to " + citizen.CitizenID)
return nil
}
// 6. Send Email to citizen to confirm validation
// We just log to console, but for the real code, this should send the email
ctx.Log("Mail confirmation has sent to " + citizen.CitizenID)
// 7. Produce message to topic "citizen confirmed"
prod := ctx.Producer(cfg.MQServers())
err = prod.SendMessage(cfg.CitizenConfirmedTopic(), "", citizen)
if err != nil {
ctx.Log(err.Error())
return err
}
return nil
})
}
func startBatchScheduler(ms *Microservice, cfg IConfig) {
ms.Schedule(time.Hour, func(ctx IContext) error {
// 1. Batch Scheduler will run during 00.00 - 00.59
nowH := ctx.Now().Hour()
if nowH != 0 {
return nil
}
// 2. Will start PTask to execute all workers
// This run only 1 time a day, to make sure it will run, use 30 secs timeout
rqt := ctx.Requester("", 30*time.Second)
res, err := rqt.Post(cfg.BatchDeliverAPI(),
map[string]string{"task_id": "batch_deliver", "worker_count": "5"})
if err != nil {
ctx.Log(err.Error())
return err
}
ctx.Log(res)
return nil
})
}
func startBatchPTaskAPI(ms *Microservice, cfg IConfig) {
ms.PTaskEndpoint("/ptask/delivery", cfg.CacheServer(), cfg.MQServers())
}
func startBatchPTaskWorkerNode(ms *Microservice, cfg IConfig) {
ms.PTaskWorkerNode("/ptask/delivery",
cfg.CacheServer(),
cfg.MQServers(),
func(ctx IContext) error {
newMS := NewMicroservice()
newMS.ConsumeBatch(
cfg.MQServers(),
cfg.CitizenConfirmedTopic(),
"deliver-consumer",
5*time.Minute, // Read Timeout
5, // Batch Size
5*time.Second, // Batch Timeout
func(newCtx IContext) error {
inputs := newCtx.ReadInputs()
for _, input := range inputs {
newCtx.Log("Deliver to " + input)
}
return nil
})
newMS.Start()
ctx.Response(http.StatusOK, map[string]interface{}{"status": "success"})
return nil
})
}
func start3rdPartyMockAPI(ms *Microservice, cfg IConfig) {
ms.POST("/3rd-party/validate", func(ctx IContext) error {
time.Sleep(1 * time.Second)
status := map[string]interface{}{
"status": "ok",
}
ctx.Response(http.StatusOK, status)
return nil
})
ms.POST("/3rd-party/delivery", func(ctx IContext) error {
time.Sleep(1 * time.Second)
status := map[string]interface{}{
"status": "ok",
}
ctx.Response(http.StatusOK, status)
return nil
})
}