forked from joeledwards/msg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-cogs.coffee
executable file
·135 lines (108 loc) · 3.5 KB
/
test-cogs.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env coffee
require 'log-a-log'
_ = require 'lodash'
Q = require 'q'
fs = require 'fs'
uuid = require 'uuid'
cogs = require 'cogs-sdk'
moment = require 'moment'
durations = require 'durations'
now = -> moment().valueOf()
initialDelay = 2000
messageDelay = 1
messageCount = 100
remaining = messageCount
counts = {}
messages = {}
sent = 0
received = 0
watch = durations.stopwatch()
# Summarize all of the message deliveries
summarize = ->
totals = _(messages)
.map ({id, timing: {total}}) ->
id: id
total: total
.sortBy ({total}) -> total
longest = totals.last().total
shortest = totals.first().total
mean = _.sum(totals.map(({total}) -> total).value()) / totals.size()
#_(messages).each (message) -> console.log "#{moment(message.timing.created).toISOString()},#{message.timing.total}"
console.log "messages=#{messageCount} shortest=#{shortest} longest=#{longest} mean=#{mean} : took #{watch}"
cogs.api.getClient 'cogs.json'
.then (client) ->
namespace = 'messenger'
# recursive message delivery function
sendMsg = (client) ->
if remaining > 0
remaining -= 1
Q.delay messageDelay
.then ->
msg =
id: uuid.v1()
timing:
created: now()
console.log "Sending message #{msg.id}..."
messages[msg.id] = msg
counts[msg.id] = 0
json = JSON.stringify msg
msg.timing.sent = now()
eventName = "message-#{msg.id}"
attributes =
channel: 'messages'
message: JSON.stringify(msg)
client.sendEvent namespace, eventName, attributes
.then (result) ->
sent += 1
ts = now()
msg.timing.published = ts
pubTime = msg.timing.published - msg.timing.sent
console.log "Message ##{sent} '#{msg.id}' delivered in #{pubTime} ms : #{result}"
sendMsg client
.catch (error) ->
console.error "Error sending message: #{error}\n#{error.stack}"
msg.error = true
sendMsg client
channel =
channel: 'messages'
# Subscribe to the channel
ws = client.subscribe namespace, channel
ws.once 'open', ->
console.log 'Subscribed to the messages channel.'
watch.start()
sendMsg client
ws.on 'message', (message) ->
ts = now()
{data} = JSON.parse message
{id, timing} = JSON.parse data
msg = messages[id]
console.log "data:", data
console.log "message-id:", id
console.log "message-timing:", timing
console.log "msg:", msg
if not msg.timing?
msg.timing = {}
count = counts[id]
if not count?
console.log "UNKNOWN ===============================> #{id}"
counts[id] = 1
else
if count > 0
console.log " DUPLICATE ===========================> #{id}"
count += 1
counts[id] = count
msg.timing.received = ts
received += 1
msg.timing.delivery = msg.timing.published - msg.timing.sent
msg.timing.broker = msg.timing.received - msg.timing.published
msg.timing.total = msg.timing.received - msg.timing.sent
console.log "Message '#{id}' (#{received} of #{sent}) received in #{msg.timing.total} ms (Delivery #{msg.timing.delivery}, Cogs #{msg.timing.broker} ms): #{message}"
if remaining < 1 and received >= sent
summarize()
ws.close()
ws.on 'error', (error) ->
console.error "Cogs WebSocket error #{error}\n#{error.stack}"
ws.on 'ack', (messageId) ->
console.log "Message #{messageId} acknowledged."
ws.once 'close', ->
console.log "Cogs WebSocket closed."