From 8832066290212b7b23f5140f7b7976f81072a248 Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Mon, 20 Apr 2015 14:11:03 -0400 Subject: [PATCH 1/3] Report metrics from /api/overview endpoint. Emit metrics from rabbitmq /api/overview endpoint to ganglia. Additional miscellaneous changes: - deleted unused global variable 'url' - deleted unused function validatedResults(). It called 'isInstance' which is undefined. Probably intention was to invoke 'isinstance'. - added 'username' and 'password' command line options. --- rabbit/python_modules/rabbitmq.py | 103 ++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 21 deletions(-) diff --git a/rabbit/python_modules/rabbitmq.py b/rabbit/python_modules/rabbitmq.py index 56c294f3..f38d8ca2 100644 --- a/rabbit/python_modules/rabbitmq.py +++ b/rabbit/python_modules/rabbitmq.py @@ -14,7 +14,7 @@ from string import Template -global url, descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath +global descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath log = None @@ -28,7 +28,7 @@ keyToPath = {} last_update = None #last_update = {} -compiled_results = {"nodes": None, "queues": None, "connections": None, "exchanges": None} +compiled_results = {"nodes": None, "queues": None, "connections": None, "exchanges": None, "overview": None} #Make initial stat test time dict #for stat_type in ('queues', 'connections','exchanges', 'nodes'): # last_update[stat_type] = None @@ -91,6 +91,26 @@ EXCHANGE_METRICS = ['rmq_exchange_publish_in_rate', 'rmq_exchange_publish_out_rate'] +keyToPath['rmq_overview_message_stats_publish'] = "message_stats{0}publish".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_message_stats_ack'] = "message_stats{0}ack".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_message_stats_deliver_get'] = "message_stats{0}deliver_get".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_message_stats_deliver'] = "message_stats{0}deliver".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_message_stats_deliver_no_ack'] = "message_stats{0}deliver_no_ack".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_queue_totals_messages'] = "queue_totals{0}messages".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_queue_totals_messages_ready'] = "queue_totals{0}messages_ready".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_queue_totals_messages_unacknowledged'] = "queue_totals{0}messages_unacknowledged".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_object_totals_consumers'] = "object_totals{0}consumers".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_object_totals_queues'] = "object_totals{0}queues".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_object_totals_exchanges'] = "object_totals{0}exchanges".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_object_totals_connections'] = "object_totals{0}connections".format(JSON_PATH_SEPARATOR) +keyToPath['rmq_overview_object_totals_channels'] = "object_totals{0}channels".format(JSON_PATH_SEPARATOR) + +OVERVIEW_METRICS = ['rmq_overview_message_stats_publish', 'rmq_overview_message_stats_ack', 'rmq_overview_message_stats_deliver_get', + 'rmq_overview_message_stats_deliver', 'rmq_overview_message_stats_deliver_no_ack', + 'rmq_overview_queue_totals_messages', 'rmq_overview_queue_totals_messages_ready', + 'rmq_overview_queue_totals_messages_unacknowledged', 'rmq_overview_object_totals_consumers', + 'rmq_overview_object_totals_queues', 'rmq_overview_object_totals_exchanges', + 'rmq_overview_object_totals_connections', 'rmq_overview_object_totals_channels'] def dig_it_up(obj, path): try: @@ -103,9 +123,8 @@ def dig_it_up(obj, path): return False -def refreshStats(stats=('nodes', 'queues'), vhosts=['/']): - global url_template - global last_update, url, compiled_results +def refreshStats(stats=('nodes', 'queues', 'overview'), vhosts=['/']): + global url_template, last_update now = time.time() @@ -119,10 +138,13 @@ def refreshStats(stats=('nodes', 'queues'), vhosts=['/']): last_update = now for stat in stats: for vhost in vhosts: - if stat in ('nodes'): + if stat in ('nodes', 'overview'): vhost = '/' result_dict = {} - urlstring = url_template.safe_substitute(stats=stat, vhost=vhost) + if stat == 'overview': + urlstring = overview_url + else: + urlstring = url_template.safe_substitute(stats=stat, vhost=vhost) log.debug('urlspring: %s' % urlstring) result = json.load(urllib2.urlopen(urlstring)) # Rearrange results so entry is held in a dict keyed by name - queue name, host name, etc. @@ -131,23 +153,16 @@ def refreshStats(stats=('nodes', 'queues'), vhosts=['/']): name = entry['name'] result_dict[name] = entry compiled_results[(stat, vhost)] = result_dict + elif stat in ("overview"): + compiled_results[(stat, vhost)] = result return compiled_results - -def validatedResult(value): - if not isInstance(value, bool): - return float(value) - else: - return None - - def list_queues(vhost): global compiled_results queues = compiled_results[('queues', vhost)].keys() return queues - def list_nodes(): global compiled_results nodes = compiled_results[('nodes', '/')].keys() @@ -159,7 +174,6 @@ def list_exchanges(vhost): exchanges = compiled_results[('exchanges', vhost)].keys() return exchanges - def getQueueStat(name): refreshStats(stats=STATS, vhosts=vhosts) # Split a name like "rmq_backing_queue_ack_egress_rate.access" @@ -206,6 +220,29 @@ def getNodeStat(name): return float(value) +def getOverviewStat(name): + refreshStats(stats=STATS, vhosts=vhosts) + # Split a name like "rmq_backing_queue_ack_egress_rate.access" + + # handle queue names with . in them + + log.debug(name) + stat_name, vhost = name.split(METRIC_TOKEN_SEPARATOR) + + vhost = vhost.replace('-', '/') # decoding vhost from metric name + # Run refreshStats to get the result object + result = compiled_results[('overview', vhost)] + + value = dig_it_up(result, keyToPath[stat_name]) + + # Convert Booleans + if value is True: + value = 1 + elif value is False: + value = 0 + + return float(value) + def getExchangeStat(name): refreshStats(stats=STATS, vhosts=vhosts) # Split a name like "rmq_backing_queue_ack_egress_rate.access" @@ -254,7 +291,7 @@ def str2bool(string): def metric_init(params): ''' Create the metric definition object ''' - global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results, STATS, vhosts, zero_rates_when_idle + global descriptors, stats, vhost, username, password, urlstring, url_template, overview_url, compiled_results, STATS, vhosts, zero_rates_when_idle if log is None: setup_logging('syslog', params['syslog_facility'], params['log_level']) log.info('received the following params: %r' % params) @@ -274,6 +311,7 @@ def metric_init(params): zero_rates_when_idle = str2bool(params['zero_rates_when_idle']) url = 'http://%s:%s/api/$stats/$vhost' % (host, port) + overview_url = 'http://%s:%s/api/overview' % (host, port) base_url = 'http://%s:%s/api' % (host, port) password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, base_url, username, password) @@ -357,12 +395,29 @@ def buildExchangeDescriptors(): log.debug(d1) descriptors.append(d1) + def buildOverviewDescriptors(): + for vhost, metric in product(vhosts, OVERVIEW_METRICS): + name = "{1}{0}{2}".format(METRIC_TOKEN_SEPARATOR, metric, vhost.replace('/', '-')) + log.debug(name) + d1 = create_desc({'name': name.encode('ascii', 'ignore'), + 'call_back': getOverviewStat, + 'value_type': 'float', + 'units': 'N', + 'slope': 'both', + 'format': '%f', + 'description': 'Overview_Metric', + 'groups': 'rabbitmq,overview'}) + log.debug(d1) + descriptors.append(d1) + if 'queues' in STATS: buildQueueDescriptors() if 'nodes' in STATS: buildNodeDescriptors() if 'exchanges' in STATS: buildExchangeDescriptors() + if 'overview' in STATS: + buildOverviewDescriptors() # buildTestNodeStat() return descriptors @@ -397,6 +452,12 @@ def setup_logging(handlers, facility, level): def parse_args(argv): parser = optparse.OptionParser() + parser.add_option('--username', + action='store', dest='username', default='username', + help='') + parser.add_option('--password', + action='store', dest='password', default='password', + help='') parser.add_option('--admin-host', action='store', dest='admin_host', default='localhost', help='') @@ -404,8 +465,8 @@ def parse_args(argv): action='store', dest='admin_port', default=15672, help='') parser.add_option('--stats', - action='store', dest='stats', default='nodes,queues,exchanges', - help='csv of which stats to emit, choies: nodes, queues') + action='store', dest='stats', default='nodes,queues,exchanges,overview', + help='csv of which stats to emit, choies: nodes, queues, exchanges, overview') parser.add_option('--vhosts', action='store', dest='vhosts', default='/', help='csv of vhosts') @@ -431,7 +492,7 @@ def main(argv): (opts, args) = parse_args(argv) setup_logging(opts.log, opts.log_facility, opts.log_level) # in config files we use '/' in vhosts names but we should convert '/' to '-' when calculating a metric - parameters = {"vhost": "/", "username": "guest", "password": "guest", "metric_group": "rabbitmq", + parameters = {"vhost": "/", "username": opts.username, "password": opts.password, "metric_group": "rabbitmq", "zero_rates_when_idle": "yes", "host": opts.admin_host, "port": opts.admin_port, "stats": opts.stats.split(','), From 829c441a8dd3a289f17eba592cd52382e86ae0e1 Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Mon, 20 Apr 2015 15:04:50 -0400 Subject: [PATCH 2/3] Rabbitmq emit summation for node metrics. Some metric systems allow you generate a composite metric. Others like ganglia do not have this functionality. The sum metric is useful for reporting systems that use separation of concerns on the metric name and the host name such that host names do not appear in the metric names. --- rabbit/python_modules/rabbitmq.py | 50 +++++++++++++++++-------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/rabbit/python_modules/rabbitmq.py b/rabbit/python_modules/rabbitmq.py index f38d8ca2..536f04be 100644 --- a/rabbit/python_modules/rabbitmq.py +++ b/rabbit/python_modules/rabbitmq.py @@ -192,12 +192,6 @@ def getQueueStat(name): if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[queue_name].keys(): value = 0 - # Convert Booleans - if value is True: - value = 1 - elif value is False: - value = 0 - return float(value) @@ -211,14 +205,24 @@ def getNodeStat(name): value = dig_it_up(result, keyToPath[stat_name] % node_name) log.debug('name: %r value: %r' % (name, value)) - # Convert Booleans - if value is True: - value = 1 - elif value is False: - value = 0 return float(value) +def getNodeSumStat(name): + refreshStats(stats=STATS, vhosts=vhosts) + # Split a name like "rmq_backing_queue_ack_egress_rate.access" + stat_name, dummyName, vhost = name.split(METRIC_TOKEN_SEPARATOR) + vhost = vhost.replace('-', '/') # decoding vhost from metric name + + result = compiled_results[('nodes', '/')] + total = 0.0 + for node_name in list_nodes(): + value = dig_it_up(result, keyToPath[stat_name] % node_name) + total += value + + log.debug('name: %r value: %r' % (name, total)) + + return total def getOverviewStat(name): refreshStats(stats=STATS, vhosts=vhosts) @@ -235,12 +239,6 @@ def getOverviewStat(name): value = dig_it_up(result, keyToPath[stat_name]) - # Convert Booleans - if value is True: - value = 1 - elif value is False: - value = 0 - return float(value) def getExchangeStat(name): @@ -261,12 +259,6 @@ def getExchangeStat(name): if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[exchange_name].keys(): value = 0 - # Convert Booleans - if value is True: - value = 1 - elif value is False: - value = 0 - return float(value) @@ -364,6 +356,18 @@ def buildQueueDescriptors(): def buildNodeDescriptors(): for metric in NODE_METRICS: + name = "{1}{0}total{0}-".format(METRIC_TOKEN_SEPARATOR, metric) + log.debug(name) + d2 = create_desc({'name': name.encode('ascii', 'ignore'), + 'call_back': getNodeSumStat, + 'value_type': 'float', + 'units': 'N', + 'slope': 'both', + 'format': '%f', + 'description': 'Node_Metric', + 'groups': 'rabbitmq,node'}) + log.debug(d2) + descriptors.append(d2) for node in list_nodes(): name = "{1}{0}{2}{0}-".format(METRIC_TOKEN_SEPARATOR, metric, node) log.debug(name) From 8f157b3cee67d7d2cf3adfb9e9990662cc46c392 Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Mon, 20 Apr 2015 15:25:56 -0400 Subject: [PATCH 3/3] Delete rabbitmq metrics that are not defined. --- rabbit/python_modules/rabbitmq.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rabbit/python_modules/rabbitmq.py b/rabbit/python_modules/rabbitmq.py index 536f04be..f451d979 100644 --- a/rabbit/python_modules/rabbitmq.py +++ b/rabbit/python_modules/rabbitmq.py @@ -77,12 +77,10 @@ keyToPath['rmq_proc_used'] = "%s{0}proc_used".format(JSON_PATH_SEPARATOR) keyToPath['rmq_sockets_used'] = "%s{0}sockets_used".format(JSON_PATH_SEPARATOR) keyToPath['rmq_mem_alarm'] = "%s{0}mem_alarm".format(JSON_PATH_SEPARATOR) # Boolean -keyToPath['rmq_mem_binary'] = "%s{0}mem_binary".format(JSON_PATH_SEPARATOR) -keyToPath['rmq_mem_code'] = "%s{0}mem_code".format(JSON_PATH_SEPARATOR) -keyToPath['rmq_mem_proc_used'] = "%s{0}mem_proc_used".format(JSON_PATH_SEPARATOR) keyToPath['rmq_running'] = "%s{0}running".format(JSON_PATH_SEPARATOR) # Boolean -NODE_METRICS = ['rmq_disk_free', 'rmq_mem_used', 'rmq_disk_free_alarm', 'rmq_running', 'rmq_proc_used', 'rmq_mem_proc_used', 'rmq_fd_used', 'rmq_mem_alarm', 'rmq_mem_code', 'rmq_mem_binary', 'rmq_sockets_used'] +NODE_METRICS = ['rmq_disk_free', 'rmq_mem_used', 'rmq_disk_free_alarm', 'rmq_running', 'rmq_proc_used', + 'rmq_fd_used', 'rmq_mem_alarm', 'rmq_sockets_used'] # EXCHANGE METRICS #