Skip to content

Commit

Permalink
new module: kumofs
Browse files Browse the repository at this point in the history
  • Loading branch information
hirose31 authored and Ganglia Development Team committed Sep 21, 2010
1 parent d87e1f7 commit d16c3de
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 0 deletions.
28 changes: 28 additions & 0 deletions kumofs/README.mkdn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
kumofs
===============

python module for ganglia 3.1.

This module sends metrics on kumofs protocol "stats".

## kumofs

Kumofs is a simple and fast distributed key-value store. You can use a memcached client library to set, get, CAS or delete values from/into kumofs. Backend storage is Tokyo Cabinet and it will give you great performance.

* Data is partitioned and replicated over multiple servers.
* Extreme single node performance; comparable with memcached.
* Both read and write performance got improved as servers added.
* Servers can be added without stopping the system.
* Servers can be added without modifying any configuration files.
* The system does not stop even if one or two servers crashed.
* The system does not stop to recover crashed servers.
* Automatic rebalancing support with a consistency control algorithm.
* Safe CAS operation support.
* memcached protocol support.

<http://kumofs.sourceforge.net/>

## AUTHOR

HIROSE Masaaki <hirose31@gmail.com>

49 changes: 49 additions & 0 deletions kumofs/conf.d/kumofs.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
modules {
module {
name = "kumofs"
language = "python"

param host {
value = "localhost"
}
param port {
value = 19800
}

param refresh_rate {
value = 20
}
# param spoof_host {
# value = "IPADDRESS:HOSTNAME"
# }

}
}

collection_group {
collect_every = 30
time_threshold = 90

metric {
name = "kumofs_curr_items"
title = "Current number of items stored"
value_threshold = 0
}
metric {
name = "kumofs_cmd_get"
title = "Cumulative number of retrieval reqs"
value_threshold = 0
}
metric {
name = "kumofs_cmd_set"
title = "Cumulative number of storage reqs"
value_threshold = 0
}
metric {
name = "kumofs_cmd_delete"
title = "Cumulative number of storage reqs"
value_threshold = 0
}

}

174 changes: 174 additions & 0 deletions kumofs/python_modules/kumofs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import traceback
import os
import threading
import time
import subprocess
import re

descriptors = list()
Desc_Skel = {}
_Worker_Thread = None
_Lock = threading.Lock() # synchronization lock
Debug = False

def dprint(f, *v):
if Debug:
print >>sys.stderr, "DEBUG: "+f % v

class UpdateMetricThread(threading.Thread):

def __init__(self, params):
threading.Thread.__init__(self)
self.running = False
self.shuttingdown = False
self.refresh_rate = 20
if "refresh_rate" in params:
self.refresh_rate = int(params["refresh_rate"])
self.metric = {}
self.timeout = 2

self.host = "localhost"
self.port = 19800
if "host" in params:
self.host = params["host"]
if "port" in params:
self.port = params["port"]

def shutdown(self):
self.shuttingdown = True
if not self.running:
return
self.join()

def run(self):
self.running = True

while not self.shuttingdown:
_Lock.acquire()
self.update_metric()
_Lock.release()
time.sleep(self.refresh_rate)

self.running = False

def update_metric(self):
cmd = ["kumostat", "%s:%s" % (self.host, self.port), "stats"]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pout, perr = proc.communicate()

for m in re.split('(?:\r\n|\n)',pout):
dprint("%s",m)
d = m.split(" ")
if len(d) == 3 and d[0] == "STAT":
self.metric["kumofs_"+d[1]] = int(d[2]) if d[2].isdigit() else d[2]

def metric_of(self, name):
val = 0
if name in self.metric:
_Lock.acquire()
val = self.metric[name]
_Lock.release()
return val

def metric_init(params):
global descriptors, Desc_Skel, _Worker_Thread, Debug

print '[kumofs] kumofs protocol "stats"'
print params

# initialize skeleton of descriptors
Desc_Skel = {
'name' : 'XXX',
'call_back' : metric_of,
'time_max' : 60,
'value_type' : 'uint',
'format' : '%d',
'units' : 'XXX',
'slope' : 'XXX', # zero|positive|negative|both
'description' : 'XXX',
'groups' : 'kumofs',
}

if "refresh_rate" not in params:
params["refresh_rate"] = 20
if "debug" in params:
Debug = params["debug"]
dprint("%s", "Debug mode on")

_Worker_Thread = UpdateMetricThread(params)
_Worker_Thread.start()

# IP:HOSTNAME
if "spoof_host" in params:
Desc_Skel["spoof_host"] = params["spoof_host"]

descriptors.append(create_desc(Desc_Skel, {
"name" : "kumofs_curr_items",
"units" : "items",
"slope" : "both",
"description": "Current number of items stored",
}))
descriptors.append(create_desc(Desc_Skel, {
"name" : "kumofs_cmd_get",
"units" : "commands",
"slope" : "positive",
"description": "Cumulative number of retrieval reqs",
}))
descriptors.append(create_desc(Desc_Skel, {
"name" : "kumofs_cmd_set",
"units" : "commands",
"slope" : "positive",
"description": "Cumulative number of storage reqs",
}))
descriptors.append(create_desc(Desc_Skel, {
"name" : "kumofs_cmd_delete",
"units" : "commands",
"slope" : "positive",
"description": "Cumulative number of storage reqs",
}))

return descriptors

def create_desc(skel, prop):
d = skel.copy()
for k,v in prop.iteritems():
d[k] = v
return d

def metric_of(name):
return _Worker_Thread.metric_of(name)

def metric_cleanup():
_Worker_Thread.shutdown()

if __name__ == '__main__':
try:
params = {
"host" : "s101",
"port" : 19800,
"debug" : True,
}
metric_init(params)

# for d in descriptors:
# print ''' metric {
# name = "%s"
# title = "%s"
# value_threshold = 0
# }''' % (d["name"], d["description"])

while True:
for d in descriptors:
v = d['call_back'](d['name'])
print ('value for %s is '+d['format']) % (d['name'], v)
time.sleep(5)
except KeyboardInterrupt:
time.sleep(0.2)
os._exit(1)
except:
traceback.print_exc()
os._exit(1)

0 comments on commit d16c3de

Please sign in to comment.