Skip to content

Commit

Permalink
Merge pull request #194 from mspiegel/rabbitmq-overview-api
Browse files Browse the repository at this point in the history
Rabbitmq emit overview api metrics
  • Loading branch information
jbuchbinder committed Apr 21, 2015
2 parents 17bfb6c + 8f157b3 commit e8ba506
Showing 1 changed file with 105 additions and 42 deletions.
147 changes: 105 additions & 42 deletions rabbit/python_modules/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from string import Template

global url, descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath
global descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath

log = None

Expand All @@ -28,7 +28,7 @@
keyToPath = {}
last_update = None
#last_update = {}
compiled_results = {"nodes": None, "queues": None, "connections": None, "exchanges": None}
compiled_results = {"nodes": None, "queues": None, "connections": None, "exchanges": None, "overview": None}
#Make initial stat test time dict
#for stat_type in ('queues', 'connections','exchanges', 'nodes'):
# last_update[stat_type] = None
Expand Down Expand Up @@ -77,12 +77,10 @@
keyToPath['rmq_proc_used'] = "%s{0}proc_used".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_sockets_used'] = "%s{0}sockets_used".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_mem_alarm'] = "%s{0}mem_alarm".format(JSON_PATH_SEPARATOR) # Boolean
keyToPath['rmq_mem_binary'] = "%s{0}mem_binary".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_mem_code'] = "%s{0}mem_code".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_mem_proc_used'] = "%s{0}mem_proc_used".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_running'] = "%s{0}running".format(JSON_PATH_SEPARATOR) # Boolean

NODE_METRICS = ['rmq_disk_free', 'rmq_mem_used', 'rmq_disk_free_alarm', 'rmq_running', 'rmq_proc_used', 'rmq_mem_proc_used', 'rmq_fd_used', 'rmq_mem_alarm', 'rmq_mem_code', 'rmq_mem_binary', 'rmq_sockets_used']
NODE_METRICS = ['rmq_disk_free', 'rmq_mem_used', 'rmq_disk_free_alarm', 'rmq_running', 'rmq_proc_used',
'rmq_fd_used', 'rmq_mem_alarm', 'rmq_sockets_used']

# EXCHANGE METRICS #

Expand All @@ -91,6 +89,26 @@

EXCHANGE_METRICS = ['rmq_exchange_publish_in_rate', 'rmq_exchange_publish_out_rate']

keyToPath['rmq_overview_message_stats_publish'] = "message_stats{0}publish".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_message_stats_ack'] = "message_stats{0}ack".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_message_stats_deliver_get'] = "message_stats{0}deliver_get".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_message_stats_deliver'] = "message_stats{0}deliver".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_message_stats_deliver_no_ack'] = "message_stats{0}deliver_no_ack".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_queue_totals_messages'] = "queue_totals{0}messages".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_queue_totals_messages_ready'] = "queue_totals{0}messages_ready".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_queue_totals_messages_unacknowledged'] = "queue_totals{0}messages_unacknowledged".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_object_totals_consumers'] = "object_totals{0}consumers".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_object_totals_queues'] = "object_totals{0}queues".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_object_totals_exchanges'] = "object_totals{0}exchanges".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_object_totals_connections'] = "object_totals{0}connections".format(JSON_PATH_SEPARATOR)
keyToPath['rmq_overview_object_totals_channels'] = "object_totals{0}channels".format(JSON_PATH_SEPARATOR)

OVERVIEW_METRICS = ['rmq_overview_message_stats_publish', 'rmq_overview_message_stats_ack', 'rmq_overview_message_stats_deliver_get',
'rmq_overview_message_stats_deliver', 'rmq_overview_message_stats_deliver_no_ack',
'rmq_overview_queue_totals_messages', 'rmq_overview_queue_totals_messages_ready',
'rmq_overview_queue_totals_messages_unacknowledged', 'rmq_overview_object_totals_consumers',
'rmq_overview_object_totals_queues', 'rmq_overview_object_totals_exchanges',
'rmq_overview_object_totals_connections', 'rmq_overview_object_totals_channels']

def dig_it_up(obj, path):
try:
Expand All @@ -103,9 +121,8 @@ def dig_it_up(obj, path):
return False


def refreshStats(stats=('nodes', 'queues'), vhosts=['/']):
global url_template
global last_update, url, compiled_results
def refreshStats(stats=('nodes', 'queues', 'overview'), vhosts=['/']):
global url_template, last_update

now = time.time()

Expand All @@ -119,10 +136,13 @@ def refreshStats(stats=('nodes', 'queues'), vhosts=['/']):
last_update = now
for stat in stats:
for vhost in vhosts:
if stat in ('nodes'):
if stat in ('nodes', 'overview'):
vhost = '/'
result_dict = {}
urlstring = url_template.safe_substitute(stats=stat, vhost=vhost)
if stat == 'overview':
urlstring = overview_url
else:
urlstring = url_template.safe_substitute(stats=stat, vhost=vhost)
log.debug('urlspring: %s' % urlstring)
result = json.load(urllib2.urlopen(urlstring))
# Rearrange results so entry is held in a dict keyed by name - queue name, host name, etc.
Expand All @@ -131,23 +151,16 @@ def refreshStats(stats=('nodes', 'queues'), vhosts=['/']):
name = entry['name']
result_dict[name] = entry
compiled_results[(stat, vhost)] = result_dict
elif stat in ("overview"):
compiled_results[(stat, vhost)] = result

return compiled_results


def validatedResult(value):
if not isInstance(value, bool):
return float(value)
else:
return None


def list_queues(vhost):
global compiled_results
queues = compiled_results[('queues', vhost)].keys()
return queues


def list_nodes():
global compiled_results
nodes = compiled_results[('nodes', '/')].keys()
Expand All @@ -159,7 +172,6 @@ def list_exchanges(vhost):
exchanges = compiled_results[('exchanges', vhost)].keys()
return exchanges


def getQueueStat(name):
refreshStats(stats=STATS, vhosts=vhosts)
# Split a name like "rmq_backing_queue_ack_egress_rate.access"
Expand All @@ -178,12 +190,6 @@ def getQueueStat(name):
if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[queue_name].keys():
value = 0

# Convert Booleans
if value is True:
value = 1
elif value is False:
value = 0

return float(value)


Expand All @@ -197,14 +203,41 @@ def getNodeStat(name):
value = dig_it_up(result, keyToPath[stat_name] % node_name)

log.debug('name: %r value: %r' % (name, value))
# Convert Booleans
if value is True:
value = 1
elif value is False:
value = 0

return float(value)

def getNodeSumStat(name):
refreshStats(stats=STATS, vhosts=vhosts)
# Split a name like "rmq_backing_queue_ack_egress_rate.access"
stat_name, dummyName, vhost = name.split(METRIC_TOKEN_SEPARATOR)
vhost = vhost.replace('-', '/') # decoding vhost from metric name

result = compiled_results[('nodes', '/')]
total = 0.0
for node_name in list_nodes():
value = dig_it_up(result, keyToPath[stat_name] % node_name)
total += value

log.debug('name: %r value: %r' % (name, total))

return total

def getOverviewStat(name):
refreshStats(stats=STATS, vhosts=vhosts)
# Split a name like "rmq_backing_queue_ack_egress_rate.access"

# handle queue names with . in them

log.debug(name)
stat_name, vhost = name.split(METRIC_TOKEN_SEPARATOR)

vhost = vhost.replace('-', '/') # decoding vhost from metric name
# Run refreshStats to get the result object
result = compiled_results[('overview', vhost)]

value = dig_it_up(result, keyToPath[stat_name])

return float(value)

def getExchangeStat(name):
refreshStats(stats=STATS, vhosts=vhosts)
Expand All @@ -224,12 +257,6 @@ def getExchangeStat(name):
if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[exchange_name].keys():
value = 0

# Convert Booleans
if value is True:
value = 1
elif value is False:
value = 0

return float(value)


Expand All @@ -254,7 +281,7 @@ def str2bool(string):

def metric_init(params):
''' Create the metric definition object '''
global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results, STATS, vhosts, zero_rates_when_idle
global descriptors, stats, vhost, username, password, urlstring, url_template, overview_url, compiled_results, STATS, vhosts, zero_rates_when_idle
if log is None:
setup_logging('syslog', params['syslog_facility'], params['log_level'])
log.info('received the following params: %r' % params)
Expand All @@ -274,6 +301,7 @@ def metric_init(params):
zero_rates_when_idle = str2bool(params['zero_rates_when_idle'])

url = 'http://%s:%s/api/$stats/$vhost' % (host, port)
overview_url = 'http://%s:%s/api/overview' % (host, port)
base_url = 'http://%s:%s/api' % (host, port)
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, base_url, username, password)
Expand Down Expand Up @@ -326,6 +354,18 @@ def buildQueueDescriptors():

def buildNodeDescriptors():
for metric in NODE_METRICS:
name = "{1}{0}total{0}-".format(METRIC_TOKEN_SEPARATOR, metric)
log.debug(name)
d2 = create_desc({'name': name.encode('ascii', 'ignore'),
'call_back': getNodeSumStat,
'value_type': 'float',
'units': 'N',
'slope': 'both',
'format': '%f',
'description': 'Node_Metric',
'groups': 'rabbitmq,node'})
log.debug(d2)
descriptors.append(d2)
for node in list_nodes():
name = "{1}{0}{2}{0}-".format(METRIC_TOKEN_SEPARATOR, metric, node)
log.debug(name)
Expand Down Expand Up @@ -357,12 +397,29 @@ def buildExchangeDescriptors():
log.debug(d1)
descriptors.append(d1)

def buildOverviewDescriptors():
for vhost, metric in product(vhosts, OVERVIEW_METRICS):
name = "{1}{0}{2}".format(METRIC_TOKEN_SEPARATOR, metric, vhost.replace('/', '-'))
log.debug(name)
d1 = create_desc({'name': name.encode('ascii', 'ignore'),
'call_back': getOverviewStat,
'value_type': 'float',
'units': 'N',
'slope': 'both',
'format': '%f',
'description': 'Overview_Metric',
'groups': 'rabbitmq,overview'})
log.debug(d1)
descriptors.append(d1)

if 'queues' in STATS:
buildQueueDescriptors()
if 'nodes' in STATS:
buildNodeDescriptors()
if 'exchanges' in STATS:
buildExchangeDescriptors()
if 'overview' in STATS:
buildOverviewDescriptors()
# buildTestNodeStat()

return descriptors
Expand Down Expand Up @@ -397,15 +454,21 @@ def setup_logging(handlers, facility, level):

def parse_args(argv):
parser = optparse.OptionParser()
parser.add_option('--username',
action='store', dest='username', default='username',
help='')
parser.add_option('--password',
action='store', dest='password', default='password',
help='')
parser.add_option('--admin-host',
action='store', dest='admin_host', default='localhost',
help='')
parser.add_option('--admin-port',
action='store', dest='admin_port', default=15672,
help='')
parser.add_option('--stats',
action='store', dest='stats', default='nodes,queues,exchanges',
help='csv of which stats to emit, choies: nodes, queues')
action='store', dest='stats', default='nodes,queues,exchanges,overview',
help='csv of which stats to emit, choies: nodes, queues, exchanges, overview')
parser.add_option('--vhosts',
action='store', dest='vhosts', default='/',
help='csv of vhosts')
Expand All @@ -431,7 +494,7 @@ def main(argv):
(opts, args) = parse_args(argv)
setup_logging(opts.log, opts.log_facility, opts.log_level)
# in config files we use '/' in vhosts names but we should convert '/' to '-' when calculating a metric
parameters = {"vhost": "/", "username": "guest", "password": "guest", "metric_group": "rabbitmq",
parameters = {"vhost": "/", "username": opts.username, "password": opts.password, "metric_group": "rabbitmq",
"zero_rates_when_idle": "yes",
"host": opts.admin_host, "port": opts.admin_port,
"stats": opts.stats.split(','),
Expand Down

0 comments on commit e8ba506

Please sign in to comment.