From d16c3deeb7642e163bd29bc95c6e35de5ea36b8a Mon Sep 17 00:00:00 2001 From: HIROSE Masaaki Date: Tue, 21 Sep 2010 11:48:02 +0800 Subject: [PATCH] new module: kumofs --- kumofs/README.mkdn | 28 +++++ kumofs/conf.d/kumofs.conf | 49 +++++++++ kumofs/python_modules/kumofs.py | 174 ++++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 kumofs/README.mkdn create mode 100644 kumofs/conf.d/kumofs.conf create mode 100755 kumofs/python_modules/kumofs.py diff --git a/kumofs/README.mkdn b/kumofs/README.mkdn new file mode 100644 index 00000000..7b01ac84 --- /dev/null +++ b/kumofs/README.mkdn @@ -0,0 +1,28 @@ +kumofs +=============== + +python module for ganglia 3.1. + +This module sends metrics on kumofs protocol "stats". + +## kumofs + +Kumofs is a simple and fast distributed key-value store. You can use a memcached client library to set, get, CAS or delete values from/into kumofs. Backend storage is Tokyo Cabinet and it will give you great performance. + +* Data is partitioned and replicated over multiple servers. +* Extreme single node performance; comparable with memcached. +* Both read and write performance got improved as servers added. +* Servers can be added without stopping the system. +* Servers can be added without modifying any configuration files. +* The system does not stop even if one or two servers crashed. +* The system does not stop to recover crashed servers. +* Automatic rebalancing support with a consistency control algorithm. +* Safe CAS operation support. +* memcached protocol support. + + + +## AUTHOR + +HIROSE Masaaki + diff --git a/kumofs/conf.d/kumofs.conf b/kumofs/conf.d/kumofs.conf new file mode 100644 index 00000000..e74b4436 --- /dev/null +++ b/kumofs/conf.d/kumofs.conf @@ -0,0 +1,49 @@ +modules { + module { + name = "kumofs" + language = "python" + + param host { + value = "localhost" + } + param port { + value = 19800 + } + + param refresh_rate { + value = 20 + } + # param spoof_host { + # value = "IPADDRESS:HOSTNAME" + # } + + } +} + +collection_group { + collect_every = 30 + time_threshold = 90 + + metric { + name = "kumofs_curr_items" + title = "Current number of items stored" + value_threshold = 0 + } + metric { + name = "kumofs_cmd_get" + title = "Cumulative number of retrieval reqs" + value_threshold = 0 + } + metric { + name = "kumofs_cmd_set" + title = "Cumulative number of storage reqs" + value_threshold = 0 + } + metric { + name = "kumofs_cmd_delete" + title = "Cumulative number of storage reqs" + value_threshold = 0 + } + +} + diff --git a/kumofs/python_modules/kumofs.py b/kumofs/python_modules/kumofs.py new file mode 100755 index 00000000..8668156b --- /dev/null +++ b/kumofs/python_modules/kumofs.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import sys +import traceback +import os +import threading +import time +import subprocess +import re + +descriptors = list() +Desc_Skel = {} +_Worker_Thread = None +_Lock = threading.Lock() # synchronization lock +Debug = False + +def dprint(f, *v): + if Debug: + print >>sys.stderr, "DEBUG: "+f % v + +class UpdateMetricThread(threading.Thread): + + def __init__(self, params): + threading.Thread.__init__(self) + self.running = False + self.shuttingdown = False + self.refresh_rate = 20 + if "refresh_rate" in params: + self.refresh_rate = int(params["refresh_rate"]) + self.metric = {} + self.timeout = 2 + + self.host = "localhost" + self.port = 19800 + if "host" in params: + self.host = params["host"] + if "port" in params: + self.port = params["port"] + + def shutdown(self): + self.shuttingdown = True + if not self.running: + return + self.join() + + def run(self): + self.running = True + + while not self.shuttingdown: + _Lock.acquire() + self.update_metric() + _Lock.release() + time.sleep(self.refresh_rate) + + self.running = False + + def update_metric(self): + cmd = ["kumostat", "%s:%s" % (self.host, self.port), "stats"] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pout, perr = proc.communicate() + + for m in re.split('(?:\r\n|\n)',pout): + dprint("%s",m) + d = m.split(" ") + if len(d) == 3 and d[0] == "STAT": + self.metric["kumofs_"+d[1]] = int(d[2]) if d[2].isdigit() else d[2] + + def metric_of(self, name): + val = 0 + if name in self.metric: + _Lock.acquire() + val = self.metric[name] + _Lock.release() + return val + +def metric_init(params): + global descriptors, Desc_Skel, _Worker_Thread, Debug + + print '[kumofs] kumofs protocol "stats"' + print params + + # initialize skeleton of descriptors + Desc_Skel = { + 'name' : 'XXX', + 'call_back' : metric_of, + 'time_max' : 60, + 'value_type' : 'uint', + 'format' : '%d', + 'units' : 'XXX', + 'slope' : 'XXX', # zero|positive|negative|both + 'description' : 'XXX', + 'groups' : 'kumofs', + } + + if "refresh_rate" not in params: + params["refresh_rate"] = 20 + if "debug" in params: + Debug = params["debug"] + dprint("%s", "Debug mode on") + + _Worker_Thread = UpdateMetricThread(params) + _Worker_Thread.start() + + # IP:HOSTNAME + if "spoof_host" in params: + Desc_Skel["spoof_host"] = params["spoof_host"] + + descriptors.append(create_desc(Desc_Skel, { + "name" : "kumofs_curr_items", + "units" : "items", + "slope" : "both", + "description": "Current number of items stored", + })) + descriptors.append(create_desc(Desc_Skel, { + "name" : "kumofs_cmd_get", + "units" : "commands", + "slope" : "positive", + "description": "Cumulative number of retrieval reqs", + })) + descriptors.append(create_desc(Desc_Skel, { + "name" : "kumofs_cmd_set", + "units" : "commands", + "slope" : "positive", + "description": "Cumulative number of storage reqs", + })) + descriptors.append(create_desc(Desc_Skel, { + "name" : "kumofs_cmd_delete", + "units" : "commands", + "slope" : "positive", + "description": "Cumulative number of storage reqs", + })) + + return descriptors + +def create_desc(skel, prop): + d = skel.copy() + for k,v in prop.iteritems(): + d[k] = v + return d + +def metric_of(name): + return _Worker_Thread.metric_of(name) + +def metric_cleanup(): + _Worker_Thread.shutdown() + +if __name__ == '__main__': + try: + params = { + "host" : "s101", + "port" : 19800, + "debug" : True, + } + metric_init(params) + + # for d in descriptors: + # print ''' metric { + # name = "%s" + # title = "%s" + # value_threshold = 0 + # }''' % (d["name"], d["description"]) + + while True: + for d in descriptors: + v = d['call_back'](d['name']) + print ('value for %s is '+d['format']) % (d['name'], v) + time.sleep(5) + except KeyboardInterrupt: + time.sleep(0.2) + os._exit(1) + except: + traceback.print_exc() + os._exit(1)