-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket.rb
executable file
·88 lines (62 loc) · 2.24 KB
/
websocket.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env ruby
# encoding: utf-8
require 'rubygems'
require 'bundler'
Bundler.require(:default)
module CommandLineFlags extend OptiFlagSet
# (port: 55672) # http://localhost:55672/#/
optional_flag "amqphost"
optional_flag "queue"
optional_flag "key"
optional_flag "wsport"
and_process!
end
### INITIALIZE GLOBAL OPAD DEPLOYMENT SETTINGS ###
puts "Starting alerting_queue_reader with $opad_home=#{($opad_home ||= ENV['OPAD_HOME'])}"
begin puts "Please set $OPAD_HOME env var!"
exit end if $opad_home.nil?
require File.join($opad_home, 'common', 'read_config')
host = ARGV.flags.amqphost || $config[:support]['alerting_queue']['host']
exchange_name = ARGV.flags.queue || $config[:support]['alerting_queue']['exchange']
key = ARGV.flags.key || $config[:opad]['aspect_id']
wsport = ARGV.flags.wsport || $config[:support]['websocket']['port']
puts "ANOMALY WEBSOCKET SERVER"
puts "========================"
puts "AMQP Server: #{host}"
puts "Exchange: #{exchange_name}"
puts "Key: #{key}"
puts ""
puts "Websocket Port: #{wsport}"
puts "======================="
SOCKETS = []
# Start the websocket server and the AMQP client in parallel!
EventMachine.run do
EventMachine::WebSocket.start(:host => '0.0.0.0', :port => wsport) do |ws|
ws.onopen do
puts "Client connected to websocket: #{ws.object_id}."
SOCKETS << ws
end
ws.onclose do
puts "Websocket client #{ws.object_id} disconnected."
SOCKETS.delete ws
end
end
connection = AMQP.connect(:host => host, :logging => true)
channel = AMQP::Channel.new(connection)
exchange = AMQP::Exchange.new(channel, :topic, exchange_name)
puts "Connecting to AMQP on host #{host}. Running #{AMQP::VERSION} version of the gem. Accessing exchange #{exchange_name}"
# get a new queue with an autogenerated name (first param = '')
queue = channel.queue('', :exclusive => true, :auto_delete => true)
queue.bind(exchange, :routing_key => key).subscribe do |payload|
begin
data = JSON.parse(payload)
data = data.first if data.is_a? Array
SOCKETS.each {|client|
puts "Send to client (#{client.object_id}): #{payload}"
client.send payload
}
rescue => e
puts "error parsing: #{e}"
end
end
end