forked from joeledwards/msg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-msg.coffee
executable file
·128 lines (107 loc) · 3.3 KB
/
test-msg.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env coffee
require 'log-a-log'
_ = require 'lodash'
Q = require 'q'
uuid = require 'uuid'
pubnub = require 'pubnub'
moment = require 'moment'
durations = require 'durations'
WebSocket = require 'ws'
now = -> moment().valueOf()
#ws = new WebSocket('ws://localhost:8888')
ws = new WebSocket('ws://52.39.3.158:8888')
initialDelay = 2000
messageDelay = 1
messageCount = 100
remaining = messageCount
counts = {}
messages = {}
sent = 0
received = 0
watch = durations.stopwatch()
# Summarize all of the message deliveries
summarize = ->
totals = _(messages)
.map ({id, timing: {total}}) ->
id: id
total: total
.sortBy ({total}) -> total
longest = totals.last().total
shortest = totals.first().total
mean = _.sum(totals.map(({total}) -> total).value()) / totals.size()
#_(messages).each (message) -> console.log "#{moment(message.timing.created).toISOString()},#{message.timing.total}"
console.log "messages=#{messageCount} shortest=#{shortest} longest=#{longest} mean=#{mean} : took #{watch}"
# Message handler
ws.on 'message', (json) ->
try
{channel, message} = JSON.parse json
console.log "[#{channel}]> #{message}"
{id} = JSON.parse message
msg = messages[id]
if not msg.timing?
msg.timing = {}
count = counts[id]
if not count?
console.log "UNKNOWN ===============================> #{id}"
counts[id] = 1
else
if count > 0
console.log " DUPLICATE ===========================> #{id}"
count += 1
counts[id] = count
msg.timing.received = now()
received += 1
msg.timing.delivery = msg.timing.published - msg.timing.sent
msg.timing.broker = msg.timing.received - msg.timing.published
msg.timing.total = msg.timing.received - msg.timing.sent
console.log "Message '#{id}' (#{received} of #{sent}) received in #{msg.timing.total} ms (Delivery #{msg.timing.delivery} ms, Server #{msg.timing.broker} ms): #{json}"
if remaining < 1 and received >= sent
summarize()
ws.close()
catch error
console.error "Error parsing message: #{error}\n#{error.stack}"
# recursive message delivery function
sendMsg = ->
if remaining > 0
remaining -= 1
Q.delay messageDelay
.then ->
msg =
id: uuid.v1()
timing:
created: now()
console.log "Sending message #{msg.id}..."
messages[msg.id] = msg
counts[msg.id] = 0
json = JSON.stringify msg
msg.timing.sent = now()
try
message =
action: 'publish'
channel: 'messages'
message: json
ws.send JSON.stringify(message)
sent += 1
ts = now()
msg.timing.published = ts
pubTime = msg.timing.published - msg.timing.sent
console.log "Message ##{sent} '#{msg.id}' delivered in #{pubTime} ms"
sendMsg()
catch error
console.error "Error sending message: #{error}\n#{error.stack}"
msg.error = true
sendMsg()
# start sending messages as soon as the WebSocket is established
ws.on 'open', ->
try
# Subscribe to the channel
subscription =
action: 'subscribe'
channel: 'messages'
ws.send JSON.stringify(subscription)
catch error
console.error "Error subscribing to channel: #{error}\n#{error.stack}"
watch.start()
sendMsg()
ws.on 'close', ->
console.log "Closed."