forked from joeledwards/msg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-pubnub.coffee
executable file
·119 lines (100 loc) · 3.03 KB
/
test-pubnub.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env coffee
require 'log-a-log'
_ = require 'lodash'
Q = require 'q'
fs = require 'fs'
uuid = require 'uuid'
pubnub = require 'pubnub'
moment = require 'moment'
durations = require 'durations'
{pubKey, subKey, secret} = JSON.parse(fs.readFileSync('pubnub.json', 'utf-8'))
now = -> moment().valueOf()
config =
ssl: false
publish_key: pubKey
subscribe_key: subKey
nub = pubnub config
initialDelay = 2000
#messageDelay = 1
messageDelay = 0
messageCount = 100
remaining = messageCount + 1
counts = {}
messages = {}
sent = 0
received = 0
watch = durations.stopwatch()
# Summarize all of the message deliveries
summarize = ->
totals = _(messages)
.map ({id, timing: {total}}) ->
id: id
total: total
.sortBy ({total}) -> total
longest = totals.last().total
shortest = totals.first().total
mean = _.sum(totals.map(({total}) -> total).value()) / totals.size()
#_(messages).each (message) -> console.log "#{moment(message.timing.created).toISOString()},#{message.timing.total}"
console.log "messages=#{messageCount} shortest=#{shortest} longest=#{longest} mean=#{mean} : took #{watch}"
# Subscribe to the channel
nub.subscribe
channel: 'messages'
message: (json) ->
ts = now()
{id} = JSON.parse json
msg = messages[id]
if not msg.timing?
msg.timing = {}
count = counts[id]
if not count?
console.log "UNKNOWN ===============================> #{id}"
counts[id] = 1
else
if count > 0
console.log " DUPLICATE ===========================> #{id}"
count += 1
counts[id] = count
msg.timing.received = ts
received += 1
msg.timing.delivery = msg.timing.published - msg.timing.sent
msg.timing.broker = msg.timing.received - msg.timing.published
msg.timing.total = msg.timing.received - msg.timing.sent
console.log "Message '#{id}' (#{received} of #{sent}) received in #{msg.timing.total} ms (Delivery #{msg.timing.delivery} ms, PubNub #{msg.timing.broker} ms): #{json}"
if remaining < 1 and received >= sent
summarize()
nub.unsubscribe
channel: 'messages'
# recursive message delivery function
sendMsg = ->
if remaining > 0
remaining -= 1
Q.delay messageDelay
.then ->
msg =
id: uuid.v1()
timing:
created: now()
console.log "Sending message #{msg.id}..."
messages[msg.id] = msg
counts[msg.id] = 0
json = JSON.stringify msg
msg.timing.sent = now()
nub.publish
channel: 'messages'
message: json
callback: (result) ->
sent += 1
ts = now()
msg.timing.published = ts
pubTime = msg.timing.published - msg.timing.sent
console.log "Message ##{sent} '#{msg.id}' delivered in #{pubTime} ms : #{result}"
sendMsg()
error: (error) ->
console.error "Error sending message: #{error}\n#{error.stack}"
msg.error = true
sendMsg()
# connect after 2 sec. to give the subscription some time to "take"
Q.delay initialDelay
.then ->
watch.start()
sendMsg()